Netty


Author
|
Earl
Describe
|
该文档介绍JDK的BIO、NIO和AIO通信模型,介绍基于NIO的网络通信框架Netty的组成、黏包半包处理、协议处理、消息通信应用搭建以及远程资源调用应用的搭建并解析Netty重要组件的部分源码
Reference
|
Last Update
|
2024-8-15

 

BIO、NIO & AIO

 

BIO

  1. 概念:传统同步阻塞式通信模式,服务器每次都会分配一个线程处理一个客户端连接的读写数据

  2. 适用场景:适合连接数较小且连接数目固定的系统架构,因为每次请求都要创建一个线程进行处理,对服务器性能要求高,JDK1.4以前的唯一选择,优点是程序编写起来比较简单

  3. 特点:

    • 服务端每次接收到socket连接请求时都需要创建一个线程单独处理socket,频繁上下文切换[保存和恢复线程当前的执行状态]会影响系统性能,客户端一旦并发访问请求突增,服务端线程开销会同比例突增,而且这些线程全部是同步阻塞式的线程[windows下64位JVM默认一个线程会占用1M内存],即客户端没有传输任何数据的时候,线程仍然阻塞等待客户端数据,不能做其他任何事情,系统很容易发生线程栈溢出,线程创建失败最终导致进程宕机或者僵死,无法对外提供服务

      • 解释例子:一个socket连接就是一个客人,一个客人如果需要一个服务员专门服务,客流量太大,要招聘同等数量的服务员,餐馆就要倒闭;而且厨房就只能容纳一定数量的服务员,必须等部分服务员出了厨房下一批服务员才能进入厨房,服务员一直等拿不到下一个菜客户就等干等着

      • 阻塞模式的解释例子:服务员绑定一个客人,即使客人翻菜单没有点菜期间服务员也得等不能去做其他事

    • 服务端程序在接受客户端下一条数据上传请求到来前,当前线程都会进行无意义的等待,因此早期的服务器一般都设计为线程池配合短连接[socket连上以后做完一项业务需要赶紧把链接断开把线程资源让出来]

    • 优点是能解决多种客户端的消息通信

    • Java中原生的字节流、字符流都是BIO操作方式

  4. BIO模式使用要点:[其实就是socket结合IO流可以在客户端和服务器端随意收发消息,我们在服务器内部使用集合管理socket,结合线程池使用单独一个线程处理单个socket连接接收客户端消息,只要等待读取客户端消息阻塞过程]

    • 接收并处理客户端Socket连接对象

      • 服务端调用Socket socket=serverSocket.accept()阻塞等待全新客户端socket连接请求[一个客户端连接请求被监测到在代码中被封装为一个socket对象],客户端尝试与服务端建立连接后该方法返回socket对象并创建一个新线程将socket交给该线程处理,以后只要客户端该socket连接没有关闭,该客户端的所有消息通信都会直接通过该socket传输并被分配的线程独立处理,如果有新的socket请求到来或重复上述过程[注意单线程监听socket消息时会阻塞,socket.accept()监听新的客户端socket连接时也会阻塞,这两者不可调和的矛盾处于同一个线程会导致要么无法正常监听已连接的socket消息或者无法接收新的socket连接请求]

    • BIO模式下,客户端使用特定输出流如DataInputStream上传数据,服务端也必须使用对应的输入流DataOutputStream接收数据,这个输入输出流是socket对象自带的

    • 文件数据传输前需要先dataOutputStream.writeUTF(".png")通过输入流向服务端写入文件后缀名,服务端通过dataOutputStream.readUTF()后缀名识别文件类型并将数据保存在指定后缀名的文件中

    • 客户端发送完数据以后要调用socket.shutdownOutput()来通知服务端退出读数据循环,没有该通知服务端会一直阻塞在原生流读取如dataOutputStream.read(buffer)最后一次循环读取文件但是缓冲区读不满,dataOutputStream.read(buffer)因为没有读满且读取时抛出异常缓冲区数据没有刷新到本地导致文件数据不完整,与缓冲区倍数大小的文件则能正常写入完整文件,但是没有socket.shutdownOutput()通知服务端断开连接服务端都是通过抛异常的方式跳出读客户端数据循环

  5. 客户端与服务端通信

    • 服务端通过new ServerSocket(9999)指定服务端socket通信端口,手动调用Socket socket=serverSocket.accept()来等待客户端的全新socket请求,等到请求后返回与客户端建立连接的socket并将socket交给单独的线程处理,线程获取到socket对象的输入流并处理输入流中的数据,以后该socket连接只要没有关闭,该socket的所有消息通信都会直接通过该socket传输并被分配的线程独立处理,如果有新的socket请求到来或重复上述过程,使用线程池来限制socket请求连接数量避免服务器资源被耗尽

    • 客户端通过Socket socket = new Socket("127.0.0.1",9999)向指定服务器socket通信端口建立连接,通过socket.getInputStream()获取的字节输出流或者包装字节输出流的处理流将数据输出到服务器,写数据前需要如dataOutputStream.writeUTF(".png")手动告知服务端数据类型,通过处理流如dataOutputStream.write(buffer,0,len)来向服务端写出数据

  6. 客户端与客户端的端对端通信

    • 服务端通过new ServerSocket(9999)指定服务端socket通信端口,需要手动调用Socket socket=serverSocket.accept()来等待客户端的全新socket请求,等到请求后返回与客户端建立连接的socket,将socket对象存入集合并将socket交给单独的线程处理,线程获取到socket对象从输入流中读取到数据,并从socket集合中获取到目标socket对象,使用socket对象的输出流将消息发送给一个或者多个目标客户端,处理socket的线程在读取数据过程中如果客户端下线了,socket连接断开,服务端读取过程中检测到连接断开会抛出异常,抛出异常时我们就可以从集合中移除对应的socket对象

     

NIO



 

  1. 概念:同步非阻塞式通信模式,服务器专门分配一个线程处理多个客户端连接请求,客户端发送的请求连接通道都会注册到一个多路复用器/选择器上,多路复用器会自动轮询所有通信管道检查哪些通道触发了可连接、可读、可写事件,有事件发生选择器配合一个线程来处理通信请求,如果管道没有数据线程就会去做其他事情;NIOChannel通道、Buffer缓冲区、Selector选择器构成

    • 适用场景:适合连接数多且连接时间比较短即数据量比较低的系统架构,比如聊天服务器、弹幕系统、服务器间通讯。程序编辑起来比较复杂;NIO的编程在生产中比较复杂,非常容易出现bug造成系统风险,特别是大型服务器前期投入成本和后期维护成本非常高,一般都使用对NIO代码进行包装的更高级的网络通信框架Netty来解决

    • 特点:

      • BIO相关的类都在java.io包下,BIO基于字节流和字符流即流的数据处理方式进行数据传输的同步阻塞式IO;NIO是Java1.4引入的新的IOAPI,相关的类都在java.nio包下,并且对原java.io包下的很多类都进行了改写,NIO基于通道和缓冲区即块的数据处理方式进行数据传输的非阻塞式IO[非阻塞体现在线程不会在连接未断开但无事发生期间死等客户端请求],数据总是从通道读取到缓冲区,或者从缓冲区写入通道,通道负责数据的传输,缓冲区负责数据的读写,NIO的缓冲区既可以读也可以写,BIO中的IO流要么只能读要么只能写

      • NIO每个通道对应一个缓冲区,一个线程对应一个选择器,一个选择器对应多个通道,使用选择器轮询所有通道,只有通道上有读写数据事件发生的情况下才会触发一个线程处理该通道的数据,NIO要求每个通信请求的操作时间要短,因为NIO是通过选择器轮询每个通道的方式来处理通道的数据传输,如果通道的数据太多会长期占用当前线程,导致处理各个通道消息的性能降低

      • 解释案例:一个服务员管理多个注册的餐桌,餐桌上的客人有任何需求直接通知服务员

  2. NIO的一般使用流程:

    • 定义一个输入流或者输出流与源文件接通

    • 通过输入流或者输出流的getChannel()方法获取通道

    • XxxxBuffer.allocate(int capacity)或者XxxxBuffer.allocateDirect(int capacity)定义一个缓冲区,写出数据时先将数据写入到缓冲区

    • 通过通道的channel.write(ByteBuffer src)来将数据写出

    • 读数据时通过通道的channel.read(buffer)将数据读取到缓冲区

  3. 网络编程中的黏包和半包

    • 概念:黏包和半包现象在网络编程中很常见,这种现象表现为原本使用指定标识符如\n分隔的多条数据由于某些原因在数据接收时被重新组合,多条消息被组合在一起就是黏包,一条消息被截断成两个部分就是半包,具体示例如下:

      [网络数据]

      [接收后的处理数据]

    • 原因:

      • 黏包和半包的原因是网络数据传输把多条消息封装成一个整体一起发送效率更高,这就和快递员投递货物一样,攒一小车货物再去送货效率更高

      • 黏包现象就是因为一次性把多条消息拼接在一起发送产生的;

      • 半包现象是因为服务器的缓冲区大小导致的,缓冲区大小有限制,网络数据只能使用固定大小的缓冲区分多次读取出来,传输的数据就可能在缓冲区边界处发生半包现象

    • 业务需求

      • 将缓冲区从套接字中接收到的黏包半包消息还原成以/n作为分隔符的多条消息

    • 实现逻辑[这段处理在网络编程中处理黏包半包现象很重要的操作,Netty已经将该操作封装了]

      • 遍历缓冲区的每个字节,找到缓冲区数据的第一个换行字符byteBuffer.get(i)=='\n',准备一个对应第一条数据长度的缓冲区,将缓冲区的第一条数据读出;继续遍历缓冲区的换行字符后续的字节,直到第二个换行符,读出第二条数据;依次类推直到读出最后一条带换行符的数据;然后调用byteBuffer.compact()方法将所有已经读出数据覆盖

      • 最后一条处于半包状态的数据因为没有找到换行符不会执行读出操作,执行完第一步以后再用保存了半包数据的缓冲区继续去读套接字通道中的数据,再对缓冲区执行第一步操作

      • 循环执行上面两步操作直到套接字中的数据被全部读出

    • 更高效的实现逻辑

  4. 消息边界问题

    • 现象:Win10的默认字符集是UTF-8,我们如果socketChannel.write(Charset.defaultCharset().encode("中国"))写入数据使用操作系统默认字符集UTF-8来对字符进行编码,一个汉字会被编码成三个字节,一个英文或者数字占一个字节;如果缓冲区不够大不能一次性容纳所有数据,很容易发生单次写入缓冲区的数据最后一个字符不完整,如果读取一次就直接将全部字节数据转换成字符最后一个字符会因为字节不完整转换为乱码,第二次读取到缓冲区的数据因为第一个字符的字节就不对导致后续数据全部乱码,这是没有正确处理消息边界产生的问题

    • 原因:网络中消息的实际长度一般不是固定的,但是服务端的缓冲区的长度一般是固定的,随消息长度的不同就可能发生半包、黏包现象;注意该现象是没有使用分隔符区分每条消息的情况

      • 实际上这个问题不止NIO要考虑,BIO也要考虑

    • 解决方案:

      • 方案1️⃣:客户端和服务端都约定使用相同长度的缓冲区收发消息,即数据包大小是恒定的

        • 缺点是所有消息都使用固定长度的缓冲区收发,可能会导致内存空间的浪费,而且浪费带宽,这种方式基本不使用

      • 方案2️⃣:每条完整的消息都使用特定的分隔符进行分隔,使用每条消息对应长度的缓冲区读取消息,原缓冲区继续带着最后一条不完整的消息继续去通道中读取后续消息并重复上述过程

        • 缺点:如果单条消息就比缓冲区更长需要单独进行处理,而且这种方式效率不高,因为要检查哪些位置是分隔符才能确定写出消息的缓冲区大小,有了缓冲区才能写数据,比直接写出多了一倍的消耗,这种方式用的不是很多

        • 需要对一条消息超出缓冲区容量的场景专门进行处理,处理办法是缓冲区作为局部变量生命周期太短,在一条未读完的消息读取剩余部分上一条消息的缓冲区就被释放了,解决办法是在通道注册时为通道绑定一个缓冲区将缓冲区的生命周期延长成和通道一样,如果缓冲区大小不够重新创建一个新的容量为原来两倍的缓冲区来接收数据并selectionKey.attach(byteBuffer)重新绑定为通道的附件;判定缓冲区需要扩容的时机为byteBuffer.compact()后缓冲区的position仍然和limit相等即缓冲区覆盖已写出数据后仍然无法写入数据时

        • Netty做的更精细,通道特有的ByteBuffer是一个容量自适应的缓冲区,如果发现消息长度越来越小会将缓冲区的长度也变小

      • 方案3️⃣:在每条消息的开头使用固定字节长度专门存储紧接着一条消息的字节数,后面紧跟消息类型和消息本身;服务端根据消息开头的字节数量开辟对应的缓冲区,一次遍历就将单条消息写入缓冲区,这种方式叫LTV格式[L表示长度、T表示消息类型、V表示消息实际内容]

        • HTTP2.0协议就是LTV格式,HTTP1.1TLV格式[先传输消息类型即http请求头中的content-type、再传输消息长度即请求头中的content-length最后传输消息本身即请求体]

 

缓冲区

  1. 概念:缓冲区是一块可以写入和读取数据的内存,本质上是一个数组,被封装为Buffer对象,提供一组方法方便地访问这块内存,对比对数组的操作更容易

    • 缓冲区负责与NIO通道进行交互,数据从通道读入缓冲区,从缓冲区写入通道,缓冲区直接和网络套接字socket交互[这个交互过程被封装到通道写入或者读取缓冲区数据的方法中],通道不直接与网络套接字socket交互

    • Buffer的子类:ByteBufferCharBufferShortBufferIntBufferLongBufferFloatBufferDoubleBuffer,这些子类因为继承相同的父类都采用相似的方法来管理缓冲数据,只是管理的数据类型不同,可以通过XxxxBuffer.allocate(int capacity)来创建一个指定容量的非直接缓冲区对象,可以通过XxxxBuffer.allocateDirect(int capacity)来创建一个指定容量的直接缓冲区对象;

      • 最常用的是ByteBuffer,这是一个抽象类,子实现类包括MappedByteBufferDirectByteBufferHeapByteBuffer,其他的缓冲区都用的比较少

    • 缓冲区的基本属性

      • capacity容量:表示缓冲区内存块的固定大小,不能为负,创建以后不能更改

      • limit限制:表示缓冲区中可以被操作即读写数据的容量大小,当缓冲区写入数据时,limit应该等于缓冲区的容量,当从缓冲区读取数据时,limit设置为缓冲区实际写入的数据量,limit不能为负且不能大于缓冲区容量,默认初始limit在容量对应的索引下标处[比如初始容量为10,初始limit的值就为10,但是实际最大能写的索引为9]

      • position位置:下一个要读取或者写入数据的索引,初始位置为0,位置不能为负且不能大于缓冲区容量,注意写完数据position的位置为最后一个写入数据的下一位,写完数据后我们可以通过缓冲区的flip()方法将position属性的值重置为索引0处,因此flip()方法也可以看做是缓冲区写完数据切换为读数据模式[因为flip()方法在将position重置为0前还将limit限制在了读取数据的最后一位,因此切换为写模式必须调用clear()方法让position重置为0的同时还将limit重置为capacity]

      • mark标记:标记也是一个索引,当position在某个位置时可以通过缓冲区对象的mark()方法用mark标记记录position的位置

      • reset重置:通过调用缓冲区对象的reset()方法,我们可以将属性postion重置到标记mark

  2. 缓冲区的一般用法

    • 调用channel.read(buffer)向缓冲区写入数据

    • 调用buffer.flip()将写完数据的缓冲区切换为读模式

    • 调用buffer.get()或者buffer.get(byte[] dst)读取数据

    • 调用clear()或者compact()将缓冲区切换为写模式

    • 重复上述步骤知道客户端数据完全上传[注意客户端数据上传在服务端完全接收以前也是阻塞的]

  3. 缓冲区常用方法

    • Buffer clear():清空缓冲区并返回缓冲区的引用,注意这个清空并不是真的清空缓冲区的数据,只是position设置为0,之后添加数据的时候才会去覆盖缓冲区的数据,注意该方法还会将limit设置为capacity;在读取一个文件时,一个缓冲区很可能读不完一个文件的内容,此时就需要使用缓冲区循环读取文件,下一次循环读取文件时一定要调用该方法让position的位置重新变成0让数据写入缓冲区直接覆盖掉缓冲区中旧的内容

    • Buffer flip():将缓冲区的limit设置为当前position的位置,将position设置为0准备读取或者写入数据

    • Buffer limit(int n):设置缓冲区的limitn

    • Buffer mark():将缓冲区当前position的值赋值给mark

    • Buffer reset():将position的位置设置到此前设置的mark标记位置处,和mark()get()结合使用一般用于重复读取缓冲区中的部分重要数据

    • int position():获取缓冲区当前的position

    • Buffer position(int n):将position的值赋值为n

    • int remaining():返回positionlimit之间的元素个数

    • get():读取当前位置处的单个字节

    • Buffer get(byte[] dst):批量读取多个字节到字节数组dst中,注意这个读取的数据会直接保存在字节数组dst中,该方法的返回值是当前缓冲区对象的引用,注意缓冲区内必须有数据才能读取,否则调用该方法会直接抛异常

    • get(int index):读取指定索引位置的字节,这个方法不会更改属性position的值[即不会移动读指针]

    • put(byte b):向缓冲区当前位置写入单个字节

    • put(byte[] src):将字节数组中的字节写入缓冲区的当前位置

    • put(int index,byte b):将指定字节写入缓冲区的指定索引位置,该方法不会更改属性position的值

    • byteBuffer.put(ByteBuffer source):将旧缓冲区source中的数据写入新的缓冲区byteBuffer

    • boolean isDirect():判断缓冲区是否直接缓冲区

    • byte[] array():将缓冲区的数据输出为字节数组

    • ByteBuffer ByteBuffer.wrap(string.getBytes()):将字符串对象string封装成ByteBuffer对象,也是将数据写入缓冲区

    • compact():在缓冲区还没有读完的情况下,缓冲区positionlimit之间的数据还没有读出,此时需要立即使用该缓冲区来继续写数据,需要调用该方法将缓冲区中未读的数据依次移到缓冲区的开头腾出更多的空间且同时将position移到未读数据最后一位的下一位,limit重置为capacity来准备写数据,即只清除已读数据,并从未读数据的最后一位的下一位准备写入数据

    • rewind():将position位置重置为0,把标记mark设为-1,一般用于重复读取缓冲区的场景

  4. 字符串和ByteBuffer的互转

    • byteBuffer.put(string.getBytes()):将字符串string转成字节数组写入ByteBuffer对象中,方法string.getBytes()默认使用的就是操作系统的默认编码格式,注意该方法向缓冲区写入数据以后不会调用flip()方法将缓冲区切换为读模式

    • ByteBuffer ---> StandardCharsets.UTF_8.encode("hello"):使用指定编码格式来将字符串"hello"转换为byteBuffer对象,注意该方法向缓冲区写入数据以后调用flip()方法将缓冲区切换为读模式

      • Charset ---> Charset.defaultCharset():获取操作系统的默认字符集,Charset本身是一个字符集类,Charset本身就可以处理字符串与ByteBuffer之间的相互转换

    • ByteBuffer ---> ByteBuffer.wrap("hello".getBytes())wrap方法是ByteBuffer缓冲区提供的一个方法,主要是用于字节数组和ByteBuffer之间做包装转换,注意该方法向缓冲区写入数据以后调用flip()方法将缓冲区切换为读模式

    • String ---> StandardCharsets.UTF_8.decode(byteBuffer).toString():将字节缓冲区中position后面的数据按UTF-8的编码格式转换为字符串,注意只有byteBuffer处于读模式才能调用该方法来转换字符串,如果byteBuffer处于写模式最终得到的结果是空串

  5. 直接缓冲区[HeapByteBuffer]和非直接缓冲区[DirectByteBuffer]

    • 直接内存:非堆内存[操作系统的内存],堆内存在应用空间;直接内存JVM在IO操作上具有更高的性能,操作系统的IO操作可以直接对直接内存进行读写,但是无法对堆内存中的数据直接进行读写,要对堆内存的数据进行读写必须将堆内存的数据拷贝到直接内存;注意直接内存在数据进行IO操作时速度比较快,但是直接缓冲区[系统内存]在申请创建的时候会耗费更高的性能,而且使用不当没有合适的释放会导致内存泄漏,Netty对直接内存进行了封装,采用对象池的手段对内存进行分配,尽可能减少对直接缓冲区对象内存分配的频率,在读写效率和分配效率上都有更好的表现,同时也通过对象池对使用完的缓冲区对象进行回收,减少内存泄漏的机会;但是这部分数据在JVM之外,不会占用应用内存,因此直接内存适合缓存生命周期很长的大量数据,比如网络并发场景涉及到频繁的IO操作、生命周期很长,数据量很大的IO操作可以考虑使用直接内存即直接缓冲区,如果只是一般的应用,并不需要IO操作具有很高的性能,还是推荐使用堆内存

    • 非直接内存:堆内存,应用内部的IO要写入数据到当前进程中,会先将数据写入直接内存,再将直接内存的数据转移到应用进程的内核内存空间[也叫应用内核,因为IO流是操作系统级别的,只能将数据写入到操作系统对应的内存空间再转移到堆内存空间来让程序使用]

      • 堆内存还会因为GC过程通过数据移动以及整理算法让内存区域更紧凑,减少内存碎片;这种移动内存数据会拉低读取写入数据的性能,因为GC会暂停整个系统的运行,但是直接内存的数据不会受到GC的影响,IO过程也不会受到GC的影响

  6. 缓冲区大小分配

    • 每个通道都需要处理可能发生的半包问题,ByteBuffer不能同时被多个通道共同使用,因此每个Channel都应该维护一个独立的可变长度的ByteBuffer,注册通道时可以同时将该ByteBuffer注册为通道的附件

    • ByteBuffer容量要可变,而且容量要自适应消息大小,可以参考Netty的实现,常用设计思路如下

      • 1️⃣:缓冲区比单条消息短,将缓冲区扩容一倍并将原来的半包数据拷贝到新缓冲区并用该缓冲区接收后续消息,优点是消息都处于一个缓冲区,处理逻辑简单;缺点是拷贝浪费性能

      • 2️⃣:使用多个数组组成缓冲区,一个数组不够将多出的数据写入新的数组,缺点是消息不连续解析复杂,优点是避免拷贝引起的性能损失,Netty中的CompositeByteBuffer就是基于该思想实现的

  7. 单次网络数据传输量是有上限的

    • 单次网络数据传输量和操作系统底层分配的socket缓冲区有关,该socket缓冲区占满了就会结束数据发送放方法的执行并只传输写入socket缓冲区的数据,此时socketChannel.write(byteBuffer)返回实际发送的数据字节数,socket缓冲区分配的大小不是固定的,范围大概在2M到8M之间[怀疑socket缓冲区是固定大小,只是当前程序每次能写入的数据量不同,因为还有其他程序竞争],只有socket缓冲区清空了才能继续发送下一条消息

      • 操作系统socket缓冲区满了程序去处理读事件:如果我们写死循环发送消息,如果遇到大消息会发生多次循环尝试完整写出数据,但是期间socket端口缓冲区还没准备好无法发送消息导致CPU大量空转的情况,一般NIO只用一个线程管理多个通道的读写,如果大文件传输过程中,单个线程一直尝试发送但是操作系统底层的socket缓冲区又没有准备好,这些尝试消息并不能成功发送出去,就形成了CAS自旋阻塞的效果;这不符合NIO的设计思想,我们应该把程序设计为socket缓冲区满了让当前线程停止发送消息,继续去监听其他读事件,等socket缓冲区清空了触发一个写事件把剩下的数据再次尝试写入通道,不要在socket缓冲区清空期间尝试将数据写入通道,写了也是白写;

        • 注意可写事件是要缓冲区有数据且socket缓冲区就绪的情况下就会触发可写事件,即缓冲区一次写入数据超出socket网络传输上限,等socket缓冲区清空以后byteBuffer中因为还有数据会自动触发该通道的可写事件

        • 因此如果缓冲区数据一次不能完全写入通道,我们可以让事件监听对象selectionKey监听通道的可写事件,同时将未写完的数据以附件的形式selectionKey.attach(byteBuffer)存入selectionKey,让当前线程继续阻塞在selector.select()监听处理其他通道,等操作系统socket缓冲区清空以后因为byteBuffer中还有数据会自动触发可写事件,此时再继续写入数据即可

        [CAS自旋尝试发送消息]

        [可写事件触发发送消息]

         

 

 

通道

  1. 概念:通道表示打开到文件或者套接字的连接,是数据传输的通道,通道类似于流但又和流不同,流只是单向的,一种输入流或者输出流只能读取或者写出数据,但是通道既可以读数据也可以写数据,而且通道可以以非阻塞的方式从缓冲读取数据或者向缓冲写入数据,而且这个读写过程不需要系统内部的程序去负责,会被系统交给操作系统底层去异步地执行,执行结束后通过回调来通知系统做下一步处理;实际上通道Channel和网络中的套接字端口是不能直接关联的,应该由缓冲区做中转,由缓冲区直接和通道交互

    • 通道可以同时读写,流只能读或者只能写

    • 通道可以异步读取数据,选择器对应的线程可以在通道读取数据期间去做其他事,由操作系统来负责读写数据的异步执行

    • 通道只能通过输出流或者输入流或者随机读写流的getChannel()方法来获取,不能直接通过实例化Channel对象来获取

      • 输出流获取fileOutputStream.getChannel(),或者通过输入流获取fileInoputStream.getChannel(),输出流获取的通道只能写,输入流获取的通道只能读;随机读写流获取的通道能否读写要根据随机读写流构造时设置的读写模式来决定[如果随机读写流的模式为rw,那么获取到的通道既可以读也可以写]

      • 通道使用后必须调用channel.close()来关闭以节省服务器资源,输出流输入流或者随机读写流的close()方法也会自动调用channel.close()来关闭对应的通道,一般建议使用JDK7try(resource1;resource2)程序编译时自动在finally块中调用资源的close()方法来释放资源

  2. 通道在NIO中是一个接口,通道的常用实现类如下

    • FileChannel:读取、写入、映射、操作文件的文件通道;

      • 🔎:注意FileChannel只能工作在阻塞模式下,即不能配合选择器selector一起使用;只有和网络相关的通道如SocketChannelServerSocketChannel才能工作在非阻塞模式下即配合选择器selector一起使用

      • 常用方法

        • int read(ByteBuffer dst):从通道中读取数据到字节缓冲区,该方法返回读取到缓冲区数据的长度,当没有读取到任何数据时返回-1,读取到的结果缓存在byteBuffer

        • long read(ByteBuffer[] dsts):分散读,从通道中读取数据分散到多个字节缓冲区中,这个效果和循环使用一个缓冲区读取一个文件的效果是相同的,就是将一个数据依次写入到多个字节缓冲区中,缓冲区写入的先后顺序按照数组中索引的先后顺序,所有的字节缓冲区中的数据组成整个文件,分散读的目的是避免将通道的数据读取到一个缓冲区再将缓冲区的数据读取封装到多个缓冲区,而是直接从通道中一次性将数据直接读取到多个缓冲区,减少数据的无效处理提高业务效率

        • int write(ByteBuffer src):将缓冲区的数据写入到通道,返回写入的字节数

          • 注意写之前通过buffer.flip()position设置到缓冲区的索引0处准备将数据从缓冲区写入到通道

          • FileChannel对应的IO通道可以写入的数据是不受限制的,缓冲区中有多少数据就能一次性向文件中写入多少数据;但是网络传输通道套接字不是这样的,套接字数据传输数据大小有限制,并不是缓冲区数据有多少,套接字就能一次性向外传输多少

          • 注意操作系统处于性能考虑,文件通道中的数据不是直接写入到磁盘文件,而是写入操作系统的缓存中,当channel关闭时才会将缓存中的数据同步到磁盘文件,也可以调用channel.force(true)手动将文件数据和元数据[文件的权限、创建时间和修改时间等信息]立刻写入磁盘文件,channel.force(true)调用多了会对性能有影响

        • long write(ByteBuffer[] srcs):集中写,将多个缓冲区中的数据依次写入到通道,也就是直接写出到文件IO通道,缓冲区的写出顺序按照缓冲区数组的索引从小到大,集中写的目的是避免将多个缓冲区的数据复制到一个缓冲区中再将该缓冲区的数据写入通道,而是直接将多个缓冲区中的数据一次性按数组顺序写出到一个通道,减少数据的无效处理提高业务效率

        • long position():返回此通道的当前读写指针位置

        • FileChannel ---> position(long p):设置此通道的当前的读写指针位置为新位置

        • long size():返回此文件通道对应文件的大小

        • FileChannel truncate(long s):将通道的文件截取为给定大小

        • void force(boolean metaData):文件通道的数据在通道关闭前不会立即同步到磁盘,而是先存入操作系统缓存待文件关闭时一次性写入磁盘;调用该方法可以手动将文件数据和元数据[文件的权限、创建时间和修改时间等信息]立刻写入磁盘文件,channel.force(true)调用多了会对性能有影响

        • osChannel.transferFrom(isChannel,isChannel.position,isChannel.size()):将输入通道isChannel中的数据复制到输出通道osChannel中,isChannel.position是原通道的起始位置

        • long ---> isChannel.transferTo(isChannel.position(),isChannel.size(),osChannel):将输入通道isChannel中的数据复制到输出通道osChannel中,isChannel.position()是数据起始位置,isChannel.size()是要传输的数据量;这种文件复制方式比我们自己使用文件输入输出流复制效率更高,底层会利用操作系统的零拷贝进行优化;而且编码非常简洁;该方法的返回值是实际传输的字节数

          • 该方法一次调用能复制的数据量最大为2G,超过2G多出的数据量一次调用无法被写入新文件,我们可以以文件初始大小作为数据量初始值left,每次传输后都使用left减去该方法的返回值即实际传输字节数,如果left大于0说明数据还没有被传输完,传输时调用方法left-=isChannel.transferTo(isChannel.size()-left,left,osChannel)

        • getRemoteAddress():获取连接通道的客户端地址

        • close():关闭通道

    • DatagramChannel:做UDP网络通信时的网络数据传输通道

    • SocketChannel:做TCP网络通信时的网络数据传输通道,客户端通过该通道和服务端进行数据读写和传输

      • socketChannel.configureBlocking(false)设置服务器的socket请求对应的通道模式为非阻塞模式,该方法的作用是让socketChannel.read(ByteBuffer buf)变成非阻塞模式[阻塞是如果客户端没有写数据服务端线程就完全暂停,非阻塞是如果客户端没有向服务端写数据服务端线程还会继续向下运行,非阻塞模式下如果没有检测到客户端写数据请求该方法返回0,如果检测到客户端的写数据请求该方法返回写入缓冲区的字节数]

      • socketChannel.close():客户端或者服务端关闭指定网络通信通道,注意不管调用socketChannel.close()正常断开连接还是客户端宕机暴力断开连接,都会在服务端对应的通道中触发一个可读事件,只是异常断开的可读事件在调用socketChannel.read(byteBuffer)处理过程中直接抛异常;正常断开的可读事件在调用socketChannel.read(byteBuffer)处理过程中不会抛异常,但是与正常返回写入缓冲区的字节数不同,此时该方法直接返回-1并且也无法处理该可读事件,此时我们必须通过返回值-1判断客户端连接正常断开并调用selectionKey.cancel()取消对应通道在选择器中的注册

      • socketChannel.read(ByteBuffer buf)等到并读取客户端套接字数据到缓冲区,阻塞模式下该方法会阻塞当前线程直到有客户端数据传输到服务端,该方法返回从通道读取到缓冲区的字节数;非阻塞模式下该方法不会阻塞当前线程并返回0;不论是非阻塞还是阻塞模式下,客户端在没有通知服务端的情况下断开连接,服务端都会在执行socketChannel.read(ByteBuffer buf)直接抛出异常并终止当前线程的执行

        • 注意即使没有执行该方法,客户端的消息仍然能到达服务端,只是需要调用该方法从套接字中将消息写入缓冲区

        • 客户端异常断开触发可读事件,该方法无法处理可读事件并抛出异常;客户端调用socketChannel.close()正常断开也会触发可读事件,该方法无法处理可读事件并返回-1

        • 注意如果客户端没有通知服务端就断开连接,客户端通道会触发一个可读事件,但是我们在调用socketChannel.read(ByteBuffer buf)处理可读事件时会直接抛出异常[因此该可读事件是无法处理的],如果我们只是捕获了异常此时事件并没有被处理,此时事件监听对象selectionKey也在集合seletedKeys中,seletor.select()因为集合selectedKeys中有未被处理的事件仍然不会阻塞当前线程,再次对未处理的selectionKey读事件进行处理,但是还是抛异常被捕获再次进入循环从而造成死循环;因此客户端连接断开触发通道读事件并且在处理读事件期间抛出异常并捕获到异常后一定要调用selectionKey.cancel()将当前无法处理的可读事件取消掉,该方法的作用是将当前selectionKey从被注册的选择器中移除掉

      • socketChannel.open(new InetSocketAddress("127.0.0.1",9999))可以获取客户端服务端数据通信的通道,一般客户端不会涉及到接收多个连接的业务,因此不需要ServerSocketChannel来动态监听客户端的socket请求接入事件,可以直接通过socketChannel.open(new InetSocketAddress("127.0.0.1",9999))来获取指定通道并将其直接注册到选择器中来监听服务端发送来的数据,向服务端写数据可以使用和监听服务端消息相同的通道,但是由于处理用户输入的扫描器会阻塞当前线程,因此需要将监听服务端的数据和向服务端写入数据用不同的线程来进行处理,避免相互干扰

      • SelectionKey ---> socketChannel.register(Selector selector,int ops,Object att):将通道SocketChannel注册到选择器上

        • 参数att表示一个通道附件,将通道socketChannel注册到选择器上时选择器会返回一个和socketChannel一一对应的selectionKey,如果我们还想和通道socketChannel绑定唯一的对象通过通道注册到选择器的同时指定一个通道附件比如解决单条消息超过缓冲区长度使用通道内共享的一个特定可扩容缓冲区byteBuffer来合并太长的一条消息

        • t2线程执行该方法在t1线程selector.select()方法执行并阻塞t1期间t2线程会被该方法阻塞直到selector.select()方法结束

      • socketChannel.write(ByteBuffer buf)向网络传输通道中写数据

        • 网络通道不像文件IO通道一样ByteBuffer中有多少数据都能写入,网络通道的写出能力是有上限的,缓冲区的数据调用一次socketChannel.write(ByteBuffer buf)不一定能将缓冲区数据全部写出,因此socketChannel将缓冲区数据写入通道的正确姿势为[即如果缓冲区中还有数据需要多次调用通道的write(byteBuffer)]

      • NIO模式网络通信下服务端读取客户端数据流程

        • 当选择器selector.select()阻塞直到监听到的事件返回值大于0进入事件处理逻辑

        • 通过selector.selectedKeys().iterator()获取到监听事件的迭代器,如果迭代器中iterator.hasNext()有下一个元素,通过iterator.next()获取到事件SelectionKey对迭代器中的事件遍历来进行处理

        • 判断事件类型,如果selectionKey.isAcceptable()true表示事件类型是客户端连接接收事件,如果selectionKey.isReadable()true表示事件类型是客户端数据读就绪事件[就是客户端数据上传到服务端事件]

        • 如果是客户端连接接收事件通过serverSocketChannel.accept()方法获取到正在连接的客户端通道SocketChannel,使用socketChannel.configureBlocking(false)将通道切换为非阻塞模式,使用socketChannel.register(selector,SelectionKey.OP_READ)将通道注册到选择器上并监听通道上的读事件

        • 如果是客户端数据读就绪事件通过(SocketChannel)selectionKey.channel()获取到读就绪状态的通道,准备字节缓冲区ByteBuffer.allocate(1024),通过循环调用socketChannel.read(byteBuffer)来读取数据,每次读取完数据后都调用byteBuffer.flip()position移到缓冲区的头部准备从缓冲区读出数据,读取完数据以后调用byteBuffer.clear()position移动到缓冲区头部准备写入数据,直到socketChannel.read(byteBuffer)返回-1表示没有数据可以读取了跳出循环

        • 迭代器中的一个事件处理完以后在下一个事件处理前调用iterator.remove(),不然当前事件会被重复处理

        • 注意SelectionKey.cancel()是取消事件对应通道在选择器中的注册,socketChannel.close()是关闭当前服务器中的socket通道,注意如果客户端连接断开,服务端的阻塞监听事件代码selector.select()会直接抛出异常,这个异常必须处理,否则会影响到其他通道的监听

      • NIO模式网络通信下客户端向服务端发送数据流程

        • 通过SocketChannel.open(new InetSocketAddress("127.0.0.1",9999))获取通道

          • 也可以通过SocketChannel socketChannel=SocketChannel.open()获取到socketChannel,并通过socketChannel.connect(new InetSocketAddress("127.0.0.1",9999))来获取通道并指定服务端的通信端口

          • 客户端也会随机指定一个端口作为客户端与服务端的通信端口

        • 通过socketChannel.configureBlocking(false)切换非阻塞模式

        • 通过ByteBuffer.allocate(1024)分配指定大小的缓冲区

        • 通过Scanner扫描器接收用户输入数据,通过byteBuffer.put(string.getBytes())将数据刷新到缓冲区,通过byteBuffer.flip()position移到缓冲区的头部准备向通道写入数据

        • 通过socketChannel.write(byteBuffer)将缓冲数据写入通道,写完数据后调用socketBuffer.clear()position移到缓冲区头部准备重新向缓冲区写入数据

        • 直到scanner.hasNext()false时表示数据写入完毕 ,注意scanner会阻塞当前线程,因此如果客户端还要监听服务器的响应数据需要开启一个异步线程来专门处理选择器对服务端的数据读就绪事件

      • NIO模式下服务端转发客户端消息到其他客户端

        • 事件是定位事件所在通道的关键、事件是一个反向代理对象,可以提取当前的客户端通道

    • ServerSocketChannel:做TCP网络通信时的网络数据传输通道,可以监听新进入系统的TCP连接,对每一个新进入的连接都会创建一个SocketChannel[SocketChan nel类似于BIO模式下的SocketServerSocketChannel类似于BIO模式下的ServerSocket]ServerSocketChannel专用于服务端,SocketChannel既可以用于服务端也可以用于客户端

      • 注意通过ServerSocketChannel ssChannel=ServerSocketChannel.open()的方式也能获取通道,该通道的作用是监听socket连接事件并且指定通信端口

      • serverSocketChannel.bind(new InetSocketAddress(9898))可以绑定连接端口,因为这是socket连接对应的通道,网络数据传输需要指定端口

      • SocketChannel ---> serverSocketChannel.configureBlocking(false)设置服务器的socket请求对应的通道模式为非阻塞模式,该方法的作用是让serverSocketChannel.accept()方法变成非阻塞模式[阻塞是如果客户端请求未接入线程就完全暂停,非阻塞是如果客户端请求未接入线程还会继续向下运行,非阻塞模式下如果没有检测到客户端请求接入该方法返回null]

      • SocketChannel ---> serverSocketChannel.accept():在选择器触发接收事件时从ServerSocketChannel中获取SocketChannel客户端连接,该方法在没有检测到客户端的连接请求会直接阻塞当前线程

      • SelectionKey ---> serverSocketChannel.register(Selector selector,int ops,Object att):将通道ServerSocketChannel注册到选择器上,该通道和通道上发生的所有事件都会封装到该方法的返回值SelectionKey对象上

        • ops值为0表示不关注任何事件

        • 注意这个serverSocketChannel对应的SelectionKey能监听所有客户端的连接接入事件,不像SocketChannel对应的SelectionKey只能监听对应通道上的读写事件

      • NIO模式下定义服务端接收socket连接请求的事件[先设置了通信端口监听连接事件才能处理上面SocketChannel中的通道事件]

        • 通过serverSocketChannel.open()获取到通道对象serverSocketChannel

        • 通过serverSocketChannel.configureBlocking(false)将通道切换为非阻塞模式

        • 通过Selector selector = Selector.open()获取选择器

        • 通过serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT)将通道注册到选择器上,并指定监听该通道上的socket请求接收事件[注意这里的选择器selector和上面SocketChannel中的服务端读取客户端数据流程中的选择器selector是同一个选择器]

 

选择器

  1. 概念:选择器能够检查一个或多个NIO通道,确定哪些通道的缓冲区已经准备好数据可以进行读取和写入,利用选择器一个单独的线程可以管理多个通道从而管理多个网络连接从而减少线程开销提高系统效率,选择器就是通道SelectableChannel对象的多路复用器,选择器可以同时监控多个通道的IO状况;通过选择器可以使用一个线程就能管理多个通道,选择器是实现非阻塞IO的核心

    • 选择器检测多个注册通道上是否有事件发生,多个通道以事件的方式注册到同一个选择器上,选择器轮询每个通道,如果通道上有事件就触发相应的机制获取事件针对事件进行相应处理,实现一个线程管理多个通道即多个连接请求;只有连接或者通道上真正有读写事件发生时,线程才会去执行读写操作,避免每个连接都创建一个线程导致线程之间的上下文切换带来额外的开销

    • 通过Selector selector = Selector.open()创建一个选择器,通过SelectableChannel.register(Selector sel,int ops)向选择器中注册通道,第二个参数ops是监听的事件类型,ServerSocketChannel只需要关注连接接入事件SelectionKey.OP_ACCEPTSocketChannel只需要关注可读事件SelectionKey.OP_CONNECT和可写事件SelectionKey.OP_WRITE

      • 事件类型用SelectionKey的四个常量表示通道有数据可以读到缓冲区事件SelectionKey.OP_READ[1]、可写事件SelectionKey.OP_WRITE[4]、连接建立后触发的时间SelectionKey.OP_CONNECT[8]、有连接请求时触发该事件SelectionKey.OP_ACCEPT[16],指定要监听的事件类型的时候使用对应的整数表示,注册时如果要监听不止一个事件,使用"位或"操作符来连接不同的事件类型如int interestSet=SelectionKey.OP_READ|SelectionKey.OP_WRITE,也可以使用+求和的结果来作为入参[比如关注可读和可写事件入参就是5]

      • 注意可写事件是要缓冲区有数据且socket缓冲区就绪的情况下就会触发可写事件,即缓冲区一次写入数据超出socket网络传输上限,等socket缓冲区清空以后byteBuffer中因为还有数据会自动触发该通道的可写事件

    • 选择器必须配合ServerSocketChannelSocketChannel的非阻塞模式一起使用

    • 选择器selector本身维护着一个集合用户保存所有在selector上注册的通道对应的事件SelectionKey,同时选择器还维护着一个集合selectedKeys用户保存当下监测到对应通道有事件发生的selectionKey,注意事件监测对象selectionKey在事件发生后会被选择器主动存入集合selectedKeys,但是选择器不会主动从集合selectedKeys中移除selectionKey

      • 事件处理完必须手动调用iterator.remove()将事件监听对象selectionKeyselectedKeys中移除,原因是事件通过调用serverSocketChannel.accept()或者socketChannel.read(byteBuffer)就能自动转成已处理状态,但是集合selectedKeys中的selectionKey并不会自动移除,而且SelectionKey只会根据事件监测对象本身设置的事件监测类型来判断selectionKey.isAccepttable()selectionKey.isReadable(),不会根据实际的事件类型来判断,这意味着只要调用了一次serverSocketChannel.accept()或者socketChannel.read(byteBuffer)就必须从selectedKeys中移除对应的selectionKey,否则下次从选择器中selector.selectedKeys().iterator()中获取到的seletedKeys中仍然包含没有事件发生上次已经处理过的selectionKey,由于选择器要配合非阻塞模式一起使用,selectionKey中对应的通道调用serverSocketChannel.accept()或者socketChannel.read(byteBuffer)会直接返回null或者0,此时就非常容易产生空指针异常导致正常的事件无法被处理

    • 多路复用

      • 概念:单个线程配合Selector完成对多个通道可读写事件的监控称为多路复用,多路复用是针对网路IO的概念,普通文件IO没法利用多路复用

  2. 常用方法

    • Selector ---> Selector.open():创建一个选择器

    • int ---> selector.select():该方法在通道没有任何事件时会阻塞当前线程,一旦有通道事件触发,该方法的返回值就会大于0并且让当前线程继续向下执行,注意如果通道事件一直没有被处理,selector.select()方法是不会阻塞的,这就可能导致一些事件一直没有被处理导致CPU不停空转,如果某些事件无法处理可以调用selectionKey.cancel()来取消事件

      • 打断select()方法导致的线程阻塞的情形:客户端发起连接请求会触发accept事件;客户端发送数据、正常关闭和异常关闭会触发read事件,如果发送的数据大于byteBuffer缓冲区的容量会触发多次读取事件;通道可写即操作系统底层可以发起网络数据通信,缓冲区ByteBuffer中有数据就会触发可写write事件;linux系统下select()方法运行会有BUG发生,该BUG发生时当前线程也不会阻塞;调用selector.wakeup()方法唤醒阻塞在select()方法的线程时;调用selector.close()方法时;selector所在线程被调用interrupt()方法打断时

    • int ---> selector.select(long timeout):超时时间内的阻塞当前线程直到有通道事件发生,如果阻塞事件超过指定超时时间,当前线程会继续向下执行,超时时间单位为毫秒,Netty中做事件监控就使用的这个API

    • int ---> selector.selectNow():该方法不管有没有事件触发,都会立即返回不会阻塞当前线程,检查选择器上是否有事件

    • Iterator<SelectionKey> ---> selector.selectedKeys().iterator():通过选择器获取到时间的迭代器

      • iterator.hasNext():存在事件就返回true

      • iterator.next():在存在事件的前提下能从迭代器中获取到当前位置的事件

      • selectionKey.isAcceptable():判断事件类型是否为接入事件

      • selectionKey.cancel():取消事件对应通道在选择器中的注册,socketChannel.close()是正常关闭当前服务端或者客户端中的socket通道,注意如果客户端连接非正常断开直接关闭,会给服务端发送一个无法被处理的可读事件,该可读事件在调用socketChannel.read()处理时会直接抛出异常,这个异常必须处理,否则会影响到其他通道的监听;而且捕获到异常后必须调用selectionKey.cancel()移除对应通道在选择器中的注册,否则这个可读事件会一直因为无法被处理导致当前线程死循环一直处理该可读事件,导致CPU空转且后续事件无法被处理[这样处理才不会导致因为一个客户端的断开影响到所有客户端的事件处理]

      • SelectableChannel ---> selectionKey.channel():获取事件监听对象对应的通道

      • selectionKey.interestOps(SelectionKey.OP_ACCEPT):指定选择器中的某个selectionKey对应通道关注的事件类型,该方法会直接用新关注事件配置覆盖掉旧的配置

      • selectionKey.isReadable():判断事件类型是否为可读事件

      • selectionKey.attachment():获取通道注册时与通道唯一绑定的附件如一个特定缓冲区等

      • selectionKey.attach(Object att):让当前事件监听对象对应的通道重新绑定一个附件

      • int ---> selectionKey.interestOps():获取旧的关注事件配置

      • selectionKey.isWriteable():判断事件是否为可写事件

    • Set<SelectionKey> ---> selector.keys():获取选择器上注册的已发生事件的事件监听对象的Set集合,注意这里面的事件监听对象除了监听socket数据上传的SocketChannel通道,还有监听socket连接接入服务器的ServerSocketChannel通道,因此不能统一在集合遍历中将所有通道都强转成SocketChannel通道,我们可以通过channel instanceof SocketChannel来判断当前对象是否为指定类型SocketChannel,如果是指定类型返回true我们再对通道进行强转

      • 此外我们遍历该集合处理完时间需要从集合selectedKeys中手动移除事件监听对象,因此不能使用增强for来进行遍历,想在集合遍历的时候能删除应该使用迭代器set.iterator()来进行遍历

    • selector.wakeup():唤醒当前阻塞在selector.select()方法上的线程,如果selector对应线程没有处于阻塞状态,该线程下次调用selector.select()方法时不会进入阻塞状态,该方法的效果类似于LockSupport.unPark()

 

NIO阻塞模式

  1. 概念:NIO的阻塞模式下,阻塞现象体现在NIO中一些会阻塞当前线程的API,阻塞体现在服务端没有检测到客户端接入请求或者客户端写入数据请求时会直接阻塞当前线程导致当前线程停止运行

    • ServerSocketChannel.accept()在没有检测到客户端的连接请求会直接阻塞当前线程,通过SocketChannel.open(new InetSocketAddress("127.0.0.1",9999))或者SocketChannel.open().connect(new InetSocketAddress("127.0.0.1",9999)发起与服务端的连接请求后ServerSocketChannel.accept()才会获取到SocketChannel并继续执行后续的代码

    • socketChannel.read(ByteBuffer buf)没有检测到客户端的写入数据请求时会直接阻塞当前线程直到有客户端数据传输到服务端

  2. 要点

    • 即使是NIO阻塞模式下,一个线程处理多个客户端的接入请求以及读写请求是很麻烦的,因为检测客户端接入和检测每个客户端写数据请求的API都会阻塞当前线程,解决办法就是让一个线程专门负责处理客户端接入请求,让一个线程专门负责一个线程的读写数据请求,但是这就是传统BIO的实现思路,只是NIO的通道和缓冲区也是如此,引入选择器通过监听通道上的事件在相应事件发生的情况下再去调用阻塞API来一个线程处理多个客户端的接入和通信请求

    • 32位JVM一个线程占用内存320K,64位JVM一个线程占用1024K,连接数过多必然导致OOM,而且线程太多会导致因为线程频繁上下文切换降低系统性能

    • 使用线程池技术减少线程数和线程上下文切换频率,但是很多客户端连接长时间不失效,会阻塞线程池中的所有线程,因此线程池配合阻塞式API只适合短连接场景,不适合长连接场景

 

NIO非阻塞模式

  1. 概念:NIO非阻塞模式可以通过serverSocketChannel.configureBlocking(false)socketChannel.configureBlocking(false)两个方法配置

    • serverSocketChannel.configureBlocking(false)开启,该方法的作用是让serverSocketChannel.accept()方法变成非阻塞模式[阻塞是如果客户端请求未接入线程就完全暂停,非阻塞是如果客户端请求未接入调用该方法线程还会继续向下运行,非阻塞模式下如果没有检测到客户端请求接入该方法返回null]

    • socketChannel.configureBlocking(false)设置服务器的socket请求对应的通道模式为非阻塞模式,该方法的作用是让socketChannel.read(ByteBuffer buf)变成非阻塞模式[阻塞是如果客户端没有写数据服务端线程就完全暂停,非阻塞是如果客户端没有向服务端写数据服务端线程还会继续向下运行,非阻塞模式下如果没有检测到客户端写数据请求该方法返回0,如果检测到客户端的写数据请求该方法返回写入缓冲区的字节数]

  2. 要点

    • 非阻塞模式下就能实现通过serverSocketChannel.accept()的返回值和socketChannel.read(ByteBuffer buf)的返回值判断是否有客户端接入请求或者是否有客户端写入数据请求而不会使当前线程陷入阻塞,通过不断循环执行serverSocketChannel.accept()以及依次对所有的socketChannel执行socketChannel.read(ByteBuffer buf)就能实现单个线程处理所有客户端的接入和写数据请求,使用循环来不停空转会浪费大量CPU资源,因此引入选择器Selector

    • 选择器必须搭配socketChannelServerSocketChannel的非阻塞模式一起使用

 

 

多线程优化



  1. 多线程优化

    • 思路:使用多个selector和多个线程进行优化,一个线程对应一个selector,一个选择器和一个线程组成一个单元,将这些单元分成两种角色,一种角色为Boss,一种角色为Worker;角色Boss只负责监听处理通道接入请求,一般只设立一个;Worker只负责通道数据的读写,一个Worker上可以注册监听处理多个通道上的读写事件,一般设置多个Worker

    • Boss实现

      • 将当前线程命名为boss,创建一个名为boss的选择器,使用该选择器监听一个serverSocketChannel上的所有accept事件,提前创建一个Worker数组限制Worker的容量,使用合适的策略将接入的socket通道派发注册到Worker对象中

        • 执行socketChannel.register(worker.selector,SelectionKey.OP_READ,null)的boss线程会被在其他线程上处于阻塞状态的selector.select()方法阻塞:注意这里创建Worker对象的时机很讲究,因为Worker对应的线程在Worker对象创建时就会启动并直接阻塞在selector.select()方法处,selector.select()方法因为在Worker对象的构造方法中启动worker线程,很可能会比socketChannel.register(worker.selector,SelectionKey.OP_READ,null)先执行,selector.select()不仅会阻塞worker对应线程,还会影响到boss线程,这里类比成boss线程没法获取锁无法执行通道注册代码即selector.select()方法阻塞时其他线程也无法将其他通道注册到selector中,此时就会发生selector.select()因为选择器没有注册任何通道无法监听到事件一直阻塞,boss线程因为selector.select()一直阻塞无法向选择器注册通道,导致程序直接卡死,而且这种情况表现的现象和正常服务端启动后的现象一样,甚至不知道发生了死锁;只要socketChannel.register(worker.selector,SelectionKey.OP_READ,null)selector.select()方法先执行就不会出现上述问题;但是又出现新的问题,Worker对象对应线程总会阻塞在selector.select()方法上,以后如果还有新的客户端接入,boss线程向该selector注册新通道时肯定会因为selector.select()方法在其他线程阻塞而无法进行注册

        • 🔑:解决办法是将socketChannel.register(worker.selector,SelectionKey.OP_READ,null)放在selector对应线程,让该方法始终在selector.select()后面执行,这里涉及到要将accept事件获取到的通道socketChannel共享到selector所在线程中,我们可以给该线程绑定的单元Worker设置一个唯一的线程安全的队列ConcurrentLinkedQueue<Runnable>在线程间共享一个通道数据,在boss线程以任务的形式将socketChannel.register(worker.selector,SelectionKey.OP_READ,null)通过queue.add()将任务添加到队列中,然后在boss线程调用selector.wakeup()保证如果selector对应线程处在阻塞状态唤醒该线程,如果selector对应线程没有处在阻塞状态那么该线程下次调用selector.select()方法时不会陷入阻塞;selector所在线程只要被唤醒,立刻通过queue.poll()弹出runnable对象并通过runnable.run()手动执行通道注册到选择器上的任务,这是Netty的实现

        • 还有一种简单的方法是在boss线程执行socketChannel.register(worker.selector,SelectionKey.OP_READ,null)前执行selector.wakeup()来避免注册时selector对应线程阻塞在selector.select()方法上,从原理上来说这个方法可行,因为线程被唤醒后还要抢夺CPU时间片并且执行读写业务,极大概率比注册代码执行的慢;但是现在的CPU都是多核,而且boss线程也有刚好执行到selector.wakeup()就进行上下文切换导致注册代码执行前selector又阻塞在selector.select()方法上了,这种情况概率小但是一旦遇上boss线程就卡死[有很多弹幕也都说这种方式会出现阻塞现象]

        • 最简单的负载均衡是使用原子整数自增并对Worker总数量取模得到worker数组对应的下标,一般将线程数设置为CPU核心数+1,具体数量可以通过阿姆达尔定律进行计算

    • Worker实现

      • Selector和线程封装到Worker对象中,让Worker对象实现Runnable接口,在构造的时候创建selectorthread并启动线程,实现Runnable接口的run()方法,在run()方法中seletor.select()监听该worker下的所有通道事件,进入正常的读写流程

        • 读写流程有很多细节,比如黏包半包的处理,客户端正常异常断开要根据channel.read(byteBuffer)来正确处理异常取消通道在选择器的注册;如果单次写数据量超过网络传输上限还要设计监听可写事件完整写入数据并移除对可写事件的关注

        • 而且必须保证创建selectorthread并运行线程的代码只被执行一次,老师的解决方案是给一个标志位成员变量,只有标志位为false的时候实例化selectorthread的代码才能执行,实例化以后将标志位改为true,后续实例化selectorthread的代码就无法被执行了

 

 

AIO

  1. AIO[异步IO]

    • 概念:AIO也称为NIO2.0版本[A就是Async,表示异步],异步非阻塞式通信模式,服务器为一个有效请求分配一个线程,每个客户端I/O请求先交给操作系统处理完成后再通过回调机制通知服务器启动线程进行接收和处理,服务器程序因为将I/O操作交给操作系统底层处理自己返回做其他事情,充分调用操作系统参与并发操作,因此适合一些比较长的连接操作,AIO是将缓冲区数据写出到通道或者将数据从通道写入到缓冲区时是由操作系统异步完成的,完成后才会通过回调函数调用线程来做下一步处理

      • 就是当前线程调用操作系统读写API并指定回调方法后会直接返回继续执行其他业务,操作系统的API阻塞等待数据从网卡复制数据到内存不会阻塞当前线程,操作系统执行完上述操作后会将结果传参到回调方法的入参通过回调方法通知用户程序

    • 适合连接数目比较多且连接比较长的系统架构,比如相册服务器,因此IO操作完全交给操作系统异步执行,因此数据量大不会影响选择器对通道的轮询,因此支持每个通道数据量比较大的场景

    • JDK1.7引入,AIO也被称为NIO2,主要在java.nio.channels包下增加了四个异步通道AsynchronousSocketChannelAsynchronousServerSocketChannelAsynchronousFileChannelAsynchronousDatagramChannelAIO在实际开发中不成熟,更高级的网络编程通信框架Netty也是基于NIO实现的

      • Windows对AIO的支持更好,通过IOCP实现真正的异步IO;但是LInux系统下异步IO在2.6引入,底层实现还是使用多路复用模拟异步IO,性能上没有优势,Java程序大多运行在Linux环境下,所以对AIO的支持不好,Netty5.0也想实现异步IO,最后发现性能没优势还更加复杂,就把Netty5.0废弃了,Netty也不支持异步IO即AIO

    • 文件也支持异步IO,但是只有网络才支持多路复用,这点要注意

  2. 文件异步IO使用举例

     

 

Path & Paths

  1. Path[java.nio.file.Path]类和Paths工具类的常用方法

    • Paths通过文件路径获取Path对象,目录层级支持.[当前路径]..[上级路径];还可以指定目录的层级,不一定必须是文件

      • Path ---> Paths.get("1.txt"):代表类路径下的1.txt

      • Path ---> Paths.get("d:\\1.txt"):代表绝对路径d:\1.txt

      • Path ---> Paths.get("d:/1.txt"):代表绝对路径d:\1.txt

      • Path ---> Paths.get("d:\\data","projects"):代表绝对路径d:\data\projects

    • path.normalize():由Path path=Path.get("d:\\data\\projects\\a\\..\\b")获取的path对象调用path.toString()仍然会返回d:\data\projects\a\..\b,如果我们调用path.normalize()会返回字符串d:\data\projects\b

 

Files

  1. Files的常用方法

    • boolean ---> Files.exists(path):检查对应路径下的文件是否存在,文件存在返回true,不存在返回false

    • Files.createDirectory(path):创建指定目录,如果目录已经存在会抛出FileAlreadyExistsException;不能一次性创建多级目录,会抛出NoSuchFileException

    • Files.createDirectories(path):创建指定目录,可以创建多层目录

    • boolean ---> Files.isDirectory(Path source):判断指定路径是否是一个目录

    • boolean ---> Files.isRegularFile(Path source):判断指定路径是否是一个普通文件

    • Files.copy(Path source,Path target):将source对应文件路径下的文件拷贝到target对应文件路径下,如果target对应文件路径下的文件已经存在,会抛出异常FileAlreadyExistsException[这个方法的效率比较高,底层使用的操作系统的实现,和fileChannel.transferTo(isChannel.position(),isChannel.size(),osChannel)的底层实现不同,但是效率差不多]

    • Files.copy(Path source,Path target,StandardCopyOption.REPLACE_EXISTING):将source对应文件路径下的文件拷贝到target对应文件路径下,如果target对应文件目录已经存在则直接将旧文件覆盖

    • File.move(Path source,Path target,StandardCopyOption.ATOMIC_MOVE):将source对应文件路径下的文件移动到target对应文件路径下,StandardCopyOption.ATOMIC_MOVE用于保证文件移动的原子性

    • Files.delete(Path target):删除target指定文件路径下的文件或者目录,如果该文件不存在就抛出异常NoSuchFileException,如果目录下还有内容会抛出异常DirectoryNotEmptyException[即该方法只能删空目录]

    • Files.walk(Path source):该方法返回的是source目录下所有文件和目录的Path对象的Stream流,可以对指定目录下的所有目录和文件进行流式操作

    • Files.walkFileTree(Path start,FileVistor vistor):从上到下一次遍历指定目录下的所有文件和目录

      • FileVistor是一个接口,这里需要传参FileVistor或者子接口比如SimpleFileVisitor<T>()的匿名实现[这个T是指定变量过程中回调方法参数列表中的文件或者目录的类型],下面以SimpleFileVisitor<Path>()的匿名实现为例,SimpleFileVisitor可以实现的方法如下:

        • 🔎:遍历打印所有文件和目录只需要重写preVisitDirectory()visitFile()方法,注意重写的时候不要修改方法的返回结果,修改返回结果可能会影响到walkFileTree()方法对文件的遍历;注意windows选中文件夹查看目录信息中的目录个数不包含当前起始文件夹;这个遍历文件夹的设计是23种设计模式中的访问者模式的一种体现,new SimpleFileVisitor(){}是访问者,遍历文件夹是Files.walkFileTree()方法中实现的,在访问者的回调方法中加入用户自定义的访问逻辑在遍历到具体文件和目录时刻来执行

        • 🔎:注意通过walkFileTree()遍历文件目录来删除文件即使用程序的方式删除文件是非常危险的,因此这种方式删除的文件不会进入回收站,删了就永久没了,因此使用代码删除文件的操作一定要考虑地慎之又慎,这是危险代码,执行了文件就会彻底消失

        • FileVisitResult ---> preVisitDirectory(T dir,BasicFileAttributes attrs)walkFileTree()方法进入每个目录前都会执行preVisitDirectory()方法,当前正在遍历的目录路径等信息会被封装到dir属性中,注意该方法和postVisitDirectory()方法再进出初始start目录前后也会分别调用

        • FileVisitResult ---> postVisitDirectory(T dir,IOException exec)walkFileTree()方法离开每个目录后都会执行postVisitDirectory()方法

        • FileVisitResult ---> visitFile(T file,BasicFileAttributes attrs)walkFileTree()方法遍历访问每个文件时会执行visitFile()方法,当前正在遍历的文件路径等信息会被封装到回调函数的file属性中

        • FileVisitResult ---> visitFileFailed(T file,IOException exec)walkFileTree()方法遍历访问每个文件失败时会执行visitFileFailed()方法

         

 

概念对比

  1. Stream和Channel

    • Stream是比较高层面的API,不会关心和使用系统层面的一些辅助功能,比如网络编程中发送接收数据自动使用操作系统中的一些缓冲区;Channel更底层,会使用操作系统提供的发送接收缓冲区这样的功能提升API性能

    • Stream只支持阻塞式API,Channel同时支持阻塞、非阻塞式API,网络Channel还可以配合选择器selector实现多路复用

    • 二者都是全双工通信,即通信的双方可以同时通信,读数据的时候可以写数据,写数据的时候可以读数据;半双工通信是通信双方必须交替通信,BIO和NIO都是全双工

      • JDK中的socket通信和Netty基于NIO的网络通信都是全双工通信,任意时刻连接通道上都允许客户端和服务端之间的双向通信,对于任意一个端读和写数据都可以同时进行,只要分别采用读线程和写线程读写数据即可,读数据操作不会阻塞写数据操作,写数据操作也不会阻塞读数据操作

  2. IO模型[同步异步,阻塞和非阻塞,多路复用]

    • 用户程序空间:用户程序的执行比如inputStream.read()不是真正从网络中读取数据,Java程序本身没有这个能力;实际上是inputStream.read()调用操作系统的API切换到Linux内核空间来准备读取指定端口的数据,用户程序空间暂时认为是用户程序运行的环境,也称用户态,Java程序去调用操作系统的方法的过程就叫做由用户态切换到内核态

    • Linux内核空间:程序执行read()调用操作系统从网络端口读取数据的API,但是数据此时不一定发过来了,操作系统会等待数据,数据到达以后,操作系统会将数据从网卡读取复制到内存中,数据复制结束了程序才会切换回用户程序空间,操作系统方法执行的环境称为内核态

    • 阻塞IO:用户程序调用操作系统的阻塞API,将程序执行由用户程序空间切换到操作系统内核空间,操作系统的阻塞API等待数据,用户程序此时也会处于阻塞状态;等阻塞API将数据拷贝到内存以后再切换回用户程序空间停止用户程序的阻塞状态

    • 非阻塞IO:用户程序调用操作系统的非阻塞API,操作系统从端口没有读取到数据不会等待数据,立刻返回给程序读取到0字节,用户线程按一定策略一定时间后再次去调用操作系统的非阻塞API,直到操作系统非阻塞API检查到网卡中有数据,操作系统会将网卡数据复制到内存再返回让程序继续向下运行,用户线程在数据复制阶段仍然是阻塞的,只是等待数据阶段不会进入阻塞状态

      • 非阻塞IO本身相对于阻塞IO并没有很高的提升,因为用户线程会频繁地调用操作系统的API,每次调用都设计用户程序空间到操作系统内核空间的切换,一方面用户程序不停空转,另一方面程序空间和内核空间的频繁切换也会影响系统性能

    • 多路复用:我们通过selector.select()方法就是在切换操作系统的内核空间调用操作系统API去查询每个通道是否有事件,直到相关通道有事件发生,操作系统就会切换回用户程序空间并告知用户线程有事件发生;用户程序根据事件类型、通道去调用socketChannel.read()切换到操作系统内核空间调用操作系统的API将数据读取到内存中再切回用户程序空间执行业务

      • 多路复用比直接调用读写API多出一层事件监听上的阻塞,但是单个线程一直调用读写API如果数据迟迟不到会一直阻塞等待,但是此时别的客户端通道的数据可能到了,该线程因为阻塞无法去执行这些数据的处理,只能使用一个客户端一个线程的方式来一对一处理,这样就导致了系统资源的不可控,使用线程池线程资源又迟早被耗尽;使用多路复用虽然每个事件都会多一次切换,但是只有selector.select()的线程会陷入阻塞等待监听多个客户端通道的接入读写事件,相当于一个线程专门监听所有通道上的事件,通道有事件才去使用单独的线程或者使用selector.select()所在线程去处理所有通道上的读写事件

      • 多路复用的核心就是使用一个线程监听所有客户端通道的接入读写事件即等待数据过程取代原来的一个线程只能等待一个通道的数据,而且会一次性返回阻塞期间监听到的所有事件,用户直接去通过对应通道的读写接入API再切换到操作系统内核状态直接处理客户端数据即可

    • 同步:当前线程自己去获取结果,获取结果的过程只需要当前线程本身参与,只有获取到结果才能做别的事情[自己去买包烟]

      • 阻塞IO需要自己等待操作系统的执行结果,因此就是同步,这叫同步阻塞

      • 非阻塞IO也需要自己等待操作系统的执行结果,因此非阻塞IO也是同步的,这也叫同步非阻塞

      • 多路复用也是当前线程等待操作系统监听事件的结果,多路复用也是同步的

    • 异步:当前线程自己不去获取结果,由其他线程将结果交给当前线程,获取结果的过程至少需要两个线程协作[叫小弟去买包烟,小弟买了烟将烟交到自己手里]

      • 当前线程创建一个线程去获取用户数据,在运行该线程以后就直接返回继续执行自己的业务,新线程阻塞等待操作系统完成数据获取并将数据以回调的方式通知到当前线程,这种情况就叫异步 ,结果和任务对应主要就是通过回调方法来确定的,回调方法就认为发起异步任务时定义了一个通知方法,入参就是结果,其他线程执行完获取到结果后会自动执行该方法并将执行完的任务结果作为参数传入该方法,这就是异步非阻塞,根本不存在异步阻塞这种说法

  3. 零拷贝

    • Java程序需要读取文件调用操作系统的读文件API从用户态切换为内核态,操作系统可以从磁盘将数据读取到操作系统的内核缓冲区[注意操作系统不能直接将数据读取到Java对象如缓冲区中,只能读取到操作系统的内核缓冲区],操作系统将数据从内核缓冲区复制到Java程序的用户缓冲区[用户缓冲区就是Java中的对象如缓冲区],然后由内核态切换到用户态

    • Java程序向网络写数据也不能直接从用户缓冲区即对象中写入网卡,必须先由用户缓冲区写入socket缓冲区,再从socket缓冲区写入网卡

    • Java层面的IO不是实际物理设备级别的直接读写,而是缓存的复制,真正的物理设备级别的读写是由操作系统完成的,一个文件的读取写出Java要经过四次拷贝,因此Java中的IO效率很低

    • Java这种低效IO情况可以使用直接缓冲区DirectByteBuffer优化,直接缓冲区使用的是操作系统内存,这块操作系统还有个特点,操作系统和Java程序都可以直接访问这块内存[弹幕说这叫内存映射],此时操作系统就可以直接将磁盘数据读取到直接缓冲区,少了一次操作系统从直接缓冲区拷贝到用户缓冲区的过程,但是仍然需要从直接缓冲区将数据拷贝到socket缓冲区,再由socket缓冲区写入网卡,使用直接缓冲区能少一次内核缓冲区到用户缓冲区的拷贝,仍然还有三次拷贝

    • Linux2.1以后提供一个sendFile方法,Java中的transferTo()或者transferFrom()方法底层就使用的该API,Java调用了这两个方法从Java程序用户态切换成内核态,不会使用CPU而是使用DMA将数据读取到内核缓冲区,然后数据会通过CPU直接从内核缓冲区读取到socket缓冲区,并且使用DMA不使用CPU将socket缓冲区数据写入网卡,和直接内存相比仍然是三次拷贝,但是只有调用transferTo()或者transferFrom()方法时从用户态切换成内核态,此后所有的操作都由操作系统完成,而且只有一次从内核缓冲区拷贝到socket缓冲区使用的是CPU

    • Linux2.4以后改变了sendFile方法的实现,Java中的transferTo()或者transferFrom()方法会直接将内核缓冲区的数据拷贝到网卡,文件数据连socket缓冲区都不经过,全程只有DMA[直接内存访问技术,DMA是支持直接内存访问专门负责数据传输的硬件,数据传输不需要使用CPU,减少CPU缓存的伪共享]两次拷贝不会使CPU,只有文件的offset偏移量和文件长度length等少量数据会拷贝到socket缓冲区

      • 只要Java中使用transferTo()或者transferFrom()方法,对应底层linux中使用sendFile方法的拷贝过程都可以称为零拷贝,零拷贝指的是不用在用户缓冲区层面发生数据复制,零拷贝的优点是用户态和内核态之间的切换少,不使用CPU可以减少CPU缓存的伪共享;注意零拷贝只适合小文件传输

 

 

 

基本组件

  1. 服务端基本组件

    • ServerBootstrap:负责组装Netty组件并启动服务器

      • serverBootstrap.group(EventLoopGroup)方法设置一个EventLoopGroup用做BossEventLoopWorkerEventLoop的容器,这种方式会自动在事件循环组中分配处理accept事件的EventLoop和处理读写事件的EventLoop

      • serverBootstrap.group(EventLoopGroup bossGroup,EventLoopGroup workerGroup):设置两个EventLoopGroup,第一个事件循环组作为BossEventLoop的事件循环组,只处理所有通道的accept事件;第二个事件循环组作为workerEventLoop的事件循环组,只处理所有通道上的读写事件

        • 注意serverBootstrap.group(EventLoopGroup)会以当前机器的核数乘以2或者用户指定数量作为eventLoopGroup中的eventLoop数量;但是serverBootstrap.group(EventLoopGroup bossGroup,EventLoopGroup workerGroup)只会在bossGroup中设置一个eventLoop[原因是服务端始终只会创建一个ServerSocketChannel]

      • serverBootstrap.channel()方法指定服务端的ServerSocketChannel实现

      • serverBootstrap.childHandler()方法通过自定义ChannelInitializer实现并重写initChannel方法作为入参为WorkerEventLoop组合多种Handler来自定义WorkerEventLoop的具体功能[child就是worker的意思]

      • serverBootstrap.bind()方法指定当前EventLoop组监听的端口

      • ServerBootstrap ---> serverBootstrap.option(ChannelOption<Object> option,Object value):给启动器做一些手动配置,option方法是针对整个服务端的全局配置,对应的是配置ServerSocketChannel的参数

        • ChannelOption是一个枚举类,里面的各个枚举值就是Netty中的对应组件,用户可以通过枚举值对Netty组件参数进行配置

          • ChannelOption.SO_RCVBUFNetty的接收缓冲区,通过serverBootstrap.option(ChannelOption.SO_RCVBUF,10)可以设置Netty的接收缓冲区滑动窗口大小为10,SO_对应TCP套接字的参数,该参数是TCP接收消息的滑动窗口大小

            • 没有指定滑动窗口大小的情况下,操作系统会自动根据服务端和客户端的网络数据传输情况自适应调整滑动窗口的大小

          • ChannelOption.SO_SNDBUFNetty的发送缓冲区,该参数对应TCP发送缓冲区的滑动窗口大小

          • ChannelOption.SO_TIMEOUT:既不是使用在Netty中也不是使用在NIO中,而是使用在最传统的阻塞式IO下,阻塞式IO下的accept或者read方法默认情况下都是无限等待,不希望永久阻塞,使用参数SO_TIMEOUT来设置等待时间

          • ChannelOption.CONNECT_TIMEOUT_MILLISNetty客户端的连接超时时间,单位是ms,比如设置连接超时时间为300ms,客户端发起连接时如果指定毫秒时间内无法建立连接会抛出ConnectTimeoutException

            • 注意如果时间设置的太长,而且确实连接不上服务端,比如设置连接超时时间为5s,但是实际可能到2s的时候Java自身就抛出其他异常比如ConnectException连接异常

            • 源码上是从配置对象中获取用户设置的等待超时时间ChannelOption.CONNECT_TIMEOUT_MILLIS,如果该参数大于0ms,使用eventLoop.schedule(runnable,ChannelOption.CONNECT_TIMEOUT_MILLIS,TimeUnit.MILLISECONDS)设置一个超时时间,该定时任务会直接创建一个ConnectTimeoutException将该异常通过connectPromise.tryFailure(cause)封装到客户端发起连接返回的ChannelFuture对象中,实际的对应子实现类都是AbstractBootstrap$PendingRegistrationPromise,主线程的channelFuture.sync()发现拿到的结果是一个异常会直接抛给主线程

            • 默认超时时间为30s

          • ChannelOption.SO_BACKLOG:配置TCP中全连接队列的容量大小,即服务端允许同时建立连接的客户端数量

      • ServerBootstrap ---> serverBootstrap.childOption():给Channel连接通道做一些手动配置,对应的是配置SocketChannel的参数

        • 同样可以使用ChannelOption中的枚举值

          • ChannelOption.RCVBUF_ALLOCATOR:调整Netty的接收缓冲区ByteBuf的容量,参数值需要设置为AdaptiveRecvByteBufAllocator对象,AdaptiveRecvByteBufAllocator对象可以通过构造方法new AdaptiveRecvByteBufAllocator(16,16,16)来实例化,三个入参依次为接收缓冲区容量的最小值、初始值和最大值,最小只能取16,Netty内部处理时如果用户指定的比16小也只会最小应用16

          • ChannelOption.TCP_NODELAYTCP中的消息可能会被拆成多个数据包发送,一些数据包的数据太少可能会触发Nagle算法等待积攒消息,服务端等待接收一条完整消息就可能存在延迟,Netty中的ChannelOption.TCP_NODELAY默认参数值为false,即开启TCP中的Nagle算法优化,但是一般服务端都希望消息不要出现延迟,实际生产中最好关闭nagle算法,将该ChannelOption.TCP_NODELAY参数值设置为true

          • ChannelOption.SO_SNDBUF:调整TCP发送缓冲区的滑动窗口大小,该参数用户自己调整是画蛇添足,早期一般会调整,但是现代的操作系统实现的TCP流量控制和拥塞控制机制会自动控制该参数,而且滑动窗口的容量也不是越大越好,太大对系统内存的占用会比较高

          • ChannelOption.SO_RCVBUF:调整TCP接收缓冲区的滑动窗口大小,该参数用户自己调整是画蛇添足,早期一般会调整,但是现代的操作系统实现的TCP流量控制和拥塞控制机制会自动控制该参数,而且滑动窗口的容量也不是越大越好,太大对系统内存的占用会比较高

          • ChannelOption.ALLOCATORAllocatorByteBuf的分配器,用户调用channelHandlerContext.alloc().buffer()channelHandlerContext.alloc()拿到的就是分配器对象,分配器默认分配的ByteBuf是一个池化的直接内存PooledUnsafeDirectByteBuf,分配器的配置源码在接口ChannelConfig的子实现中,默认的配置项都在DefaultChannelConfig中,通过对变量allocator进行赋值追踪,默认值的源码在ByteBufUtil的静态代码块中,重点是临时变量alloc的设置逻辑

            • 如果allocType是字符串unpooled就赋值UnpooledByteBufAllocator的默认实例

            • 如果allocType是字符串pooled或者是其他字符串都赋值PooledByteBufAllocator的默认实例

            • allocType通过SystemPropertyUtil.get()方法获取JVM的运行时系统变量io.netty.allocator.type的值,如果用户启动JVM时没有配置变量io.netty.allocator.type的值,会通过PlatformDependent.isAndroid() ? "unpooled" : "pooled"判断当前操作系统的平台是不是安卓,如果是安卓变量allocType设置为unpooled,如果不是安卓则设置为pooled

              • JVM的运行时系统变量通过启动时设置VM options-Dio.netty.allocator.type=unpooled来设置,io.netty.allocator.type是系统变量值的键,注意该系统变量还是设置ByteBuf是池化还是非池化,并没有设置ByteBuf使用堆内存还是直接内存

            • UnpooledByteBufAllocator.DEFAULT被赋值时会调用new UnpooledByteBufAllocator(boolean preferDirect)方法,入参preferDirect表示是否首选直接内存,该参数由PlatformDependent.directBufferPreferred()方法确定,该方法返回变量DIRECT_BUFFER_PRESERRED的值,该值通过方法!SystemPropertyUtil.getBoolean("io.netty.noPreferDirect",false)获得,该系统变量的默认值是false,即首选直接内存;如果想把ByteBuf改成堆内存只需要在JVM启动时配置系统变量-Dio.netty.noPreferDirect=true

            • 启动JVM时同时设置多个环境变量配置成-Dio.netty.allocator.type=unpooled -Dio.netty.noPreferDirect=true即可,这就是非安卓平台将ByteBuf设置为非池化堆内存的方法

              • 注意即使做了上述配置,对于网络IO读写数据时Netty强制使用直接内存,不会根据系统参数配置妥协使用堆内存,这是为了保证网络IO读写的效率,该系统参数只有对处理器中使用分配器分配的ByteBuf才会生效

          • ChannelOption.RCVBUF_ALLOCATOR:该参数的作用是控制netty接收缓冲区ByteBuf的字节容量大小,注意这个接收缓冲区指的是网络原始IO数据存放的ByteBuf,不是TCP中的接收缓冲区滑动窗口的容量大小

            • AbstractNioByteChannel#read()方法中定义了接收最原始网络数据UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(cap=1024)的逻辑,ByteBuf通过代码byteBuf=allocHandle.allocate(allocator)创建,AllocHandleRecvByteBufAllocator的内部类,该内部类通过allocate(allocator)方法传参ByteBuf的分配器allocator创建byteBuf,该入参allocator=config.getAllocator()只控制byteBuf是池化还是非池化;ByteBuf的容量,使用直接内存还是堆内存都是allocHandle决定的,因此这个接收缓冲区ByteBuf是两个分配器共同作用产生的一个ByteBuf

            • config.getAllocator()获取的就是上面ChannelOption.ALLOCATOR中的创建的allocator

            • allocHandle.allocate(allocator)中就是调用的allocator.ioBuffer(guess()),该方法强制创建直接内存类型的ByteBuf[我们平时在控制器中调用的channelHandlerContext.alloc().buffer()不带iobuffer()方法会根据系统参数来判断创建直接内存或者堆内存类型的ByteBuf]guess()方法是猜测缓冲区的大小并通过ioBuffer()方法的入参来设置[guess()方法就是Netty根据过去几次数据量动态分配缓冲区容量大小的体现]

            • allocHandle通过allocHandle=recvBuffAllocHandle()获取,在该方法中通过config().getRecvByteBufAllocator().newHandle()获取,其中config().getRecvByteBufAllocator()返回一个rcvBufAllocatorrcvBufAllocatorDefaultChannelConfig.setRecvByteBufAllocator(allocator)中赋值,该setRecvByteBufAllocator(allocator)方法在DefaultChannelConfig的构造方法中被调用,其中的入参allocator是在DefaultChannelConfig构造方法中创建的AdaptiveRecvByteBufAllocator,通过设置AdaptiveRecvByteBufAllocator的参数设置allocHandle的参数,AdaptiveRecvByteBufAllocator的构造方法中调用this(DEFAULT_MINIMUM,DEFAULT_INITIAL,DEFAULT_MAXIMUM)来进行实例化,入参DEFAULT_INITIAL表示接收缓冲区的默认初始容量为1024DEFAULT_MAXIMUM表示接收缓冲区允许的最大容量为65536DEFAULT_MINIMUM表示接收缓冲区允许的最小容量为64Netty在创建初始1024大小的接收缓冲区后会在64-65536范围内自动增大或者减小接收缓冲区的容量

            • 接收缓冲区的内存类型调用的是allocatorioBuffer(guess())方法定死了是直接内存,是否池化或者非池化还是创建allocator时根据JVM的系统参数io.netty.allocator.type决定,初始容量1024,根据过往的通信情况在64-65536范围内自适应调整接收缓冲区的容量

    • NioEventLoopGroupBossEventLoopWorkerEventLoop的容器

      • EventLoop就是使用多线程对NIO多路复用网络通信的多线程优化中的一个选择器和一个对应的线程

      • EventLoop是经过流水线pipeLine每道工序时处理数据的实际执行者,EventLoop中的线程会依次按流水线中的Handler顺序调用每个Handler的相应事件处理方法,此外还可以为每个Handler指定不同的EventLoop[换EventLoop主要针对非IO操作]

      • EventLoop可以通过selector管理多个通道上的IO操作,而且一个通道上的所有读写操作自始至终都只会由特定的一个EventLoop处理[感觉这就是NIO多线程优化的设计,可能老师的灵感也是从Netty中来的,这种只能由一个特定线程负责读写事件处理是为了避免出现并发安全问题],注意只是在数据的通道IO操作上只能由特定的EventLoop负责,对于非IO操作可以自行指定EventLoop来进行处理

      • EventLoop处理负责每个通道的IO操作,此外每个EventLoop还有一个任务队列,用户可以向任务队列中提交普通任务或者定时任务,每个EventLoop都维护着一个只有单个线程的线程池

    • ServerSocketChannel:就是NIO中的那个ServerSocketChannelNetty中的ServerSocketChannel实现包括

      • NioServerSocketChannel:这是对原生NIOServerSocketChannel做的进一步封装

      • OioServerSocketChannel:基于BIO同步阻塞式IO的ServerSocketChannel实现

      • EpollServerSocketChannel:基于LinuxEpollServerSocketChannel实现

      • KQueueServerSocketChannel:对MacServerSocketChannel实现

    • ChannelInitializer:通过自定义ChannelInitializer实现并重写initChannel方法可以为WorkerEventLoop组合多种Handler来自定义WorkerEventLoop的具体功能

      • initChannel方法在连接建立后被调用

      • 可以初始化的通道类型

        • NioSocketChannel

        • SocketChannel

      • initChannel方法中可以通过nioSocketChannel.pipeLine().addLast(handler)来添加WorkerEventLoop需要的Handler

        • pipeLine意思是流水线,流水线由一道道工序即Handler组成,处理器通过pipeLine().addLast(handler)方法有序加入流水线的最后,流水线负责将事件[读、读取完成、写、添加、添加完成、注册等事件,Netty中的事件在原来NIO的基础上进行了扩展]传播给每个HandlerHandler对自身感兴趣的事件进行处理[这是通过重写Handler对应事件的处理方法实现的]

        • 常用Handler

          • Handler分为Inbound[入站,数据从网络输入时由入站一类处理器来处理]Outbound[出站,数据从本地写出到网络时由出站一类处理器来处理]两类

          • ByteBuf转成字符串的StringDecoder

          • 将字符串转成ByteBufStringEncoder

          • 解码固定长度消息的FixedLengthFrameDeacoder

            • FixedLengthFrameDeacoder需要添加到LoggingHandler之前,否则ByteBuf的数据还没有经过定长解码器处理粘包半包现象就被LoggingHandler打印到控制台了

          • 以换行符作为消息分隔符的LineBasedFrameDecoder

            • LineBasedFrameDecoder需要添加到LoggingHandler之前,否则ByteBuf的数据还没有经过定长解码器处理粘包半包现象就被LoggingHandler打印到控制台了

          • 以自定义字符符作为消息分隔符的DelimiterBasedFrameDecoder

            • DelimiterBasedFrameDecoder需要添加到LoggingHandler之前,否则ByteBuf的数据还没有经过定长解码器处理粘包半包现象就被LoggingHandler打印到控制台了

          • 基于长度字段来确定消息边界的LengthFieldBasedFrameDecoder

            • DelimiterBasedFrameDecoder需要添加到LoggingHandler之前,否则ByteBuf的数据还没有经过定长解码器处理粘包半包现象就被LoggingHandler打印到控制台了

          • 解码HTTP协议通信消息的CombinedChannelDuplexHandler解码器,会将HTTP消息解码成消息头HttpRequest和消息体HttpContent

          • 用户可以通过自定义一个SimpleChannelInboundHandler<T>在泛型中添加消息的特定类型,可以实现处理器只对特定类型的消息起作用

            • 比如socketChannel.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>())即处理器SimpleChannelInboundHandler只会对HttpRequest类型的msg生效,如果是HttpContent类型的消息该处理器就会跳过不执行

            • SimpleChannelInboundHandler<T>是入站处理器,通过重写ChannelRead0(ChannelHandlerContext ctx,T msg)来指定对特定消息的入站处理操作

          • Netty提供了检测连接假死的空闲状态检测器的处理器IdleStateHandler,该处理器判断连接可能存在问题的原理是判断读或者写之后的空闲时间太长

            • IdleStateHandler的构造方法new IdleStateHandler(int readerIdleTimeSeconds,int writerIdleTimeSeconds,int allIdleTimeSeconds),入参readerIdleTimeSeconds是设置检查读取数据的空闲时间超过指定时间,writerIdleTimeSeconds是设置检查写入数据的空闲时间超过指定时间,allIdleTimeSeconds是设置读写的空闲时间都超过指定时间

          • LoggingHandler:可以展示channel的运行流程和状态显示出来帮助用户调试和理解代码的执行流程,通过在Handler的构造方法中指定LogLevel.DEBUG可以将日志级别调整为DEBUG,而且会自动打印客户端传输过来的ByteBuf中的内容

            • 无法识别的单个字节会打印成.,无法识别的字节一般是占用多个字节的类型数据,比如int类型数据,或者特殊字符比如换行符\n

            • 这个日志处理器底层还是使用的logback,在logback.xml中的configuration标签下添加logger标签

        • nioSocketChannel.pipeLine().addLast(EventExecutorGroup group,String name,ChannelHandler handler):入参EventExecutorGroup是指定执行当前处理器任务的事件循环组EventLoopGroup替换掉通道注册的事件循环组;String是指定处理器的名字;ChannelHandler是指定处理器

      • 通过ChannelInboundHandlerAdapter的匿名实现并重写其中特定的方法我们可以自定义Handler

        • channelInboundHandlerAdapter.channelRead()方法是指定通道触发读事件后的操作

          • channelRead()方法的入参中的msg是在Handler中流经的数据,Handler理解成一道道工序,对最初的ByteBuf进行加工,如果想将处理的结果传递给下一个Handler需要在当前Handler的事件处理方法中调用入参的channelHandlerContext.fireChannelRead(msg)

    [Netty基础用法服务端实例]

  2. 客户端基本组件

    • io.Netty.bootstrap.Bootstrap:负责组装Netty组件并启动客户端

      • bootstrap.group():设置EventLoopGroup作为EventLoop的容器,多路复用一般用在服务端,客户端也可以使用;Netty中的客户端机制也是用多路复用机制来实现的

      • bootstrap.channel():设置SocketChannel的具体实现,NioSocketChannel就是对NIOSocketChannel的封装

      • bootstrap.handler():为EventLoop组合Handler处理器

      • bootstrap.connect():通过入参InetSocketAddress为客户端指定服务端的通信地址,返回ChannelFuture对象

        • 该方法是一个异步非阻塞方法,真正发起连接的线程不是创建启动器Bootstrap的线程,而是NioEventLoopGroup中的EventLoop,建立连接是一个漫长的过程,可能时间以秒作为单位比如1秒。

    • ChannelFuture

      • channelFuture.sync():该方法是一个阻塞方法,会阻塞当前线程直到与服务端的连接成功建立后才会继续执行后续代码

        • 如果没有channelFuture.sync()这行代码,调用bootstrap.connect()的线程和执行bootstrap.connect()的线程的线程不是同一个线程,如果在连接还没有建立完就通过channelFuture.channel()获取到Channel并使用该channel向服务端发送消息,消息就会因为连接还没有成功建立直接导致消息丢失;但是只是建立连接过程中发出的消息会直接丢失,连接建立好以后,连接信息会自动封装到channel中且能正常收发消息;

        • 核心是创建启动器的线程异步非阻塞使用EventLoop和服务端建立连接,连接成功建立前在任何线程使用channel收发的消息都会直接丢失,连接建立后连接信息会直接写入channel,此后消息就能正常收发

      • channelFuture.channel():连接成功建立后获取连接通道channel对象

    • Channel:直接把这个Channel理解成NIO中的Channel就行

      • channel.writeAndFlush():将消息从客户端写出

      • channel.pipeline():获取通道对应的流水线

    [Netty基础用法客户端实例]

  3. 网络通信流程

    • 1️⃣:服务端创建启动器类ServerBootstrap

    • 2️⃣:服务端添加EventLoopGroup组件作为BossWorker的容器

    • 3️⃣:服务端根据场景选择ServerSocketChannel实现

    • 4️⃣:服务端使用childHandler方法为通道添加初始化器并指定通道初始化方法initChannel[该方法在与客户端建立流程后才会被调用]

    • 5️⃣:服务端绑定服务端监听的网络通信端口

    • 6️⃣:服务端监听accept事件

    • 7️⃣:客户端创建启动器类Bootstrap

    • 8️⃣:客户端添加EventLoopGroup组件作为BossWorker的容器

    • 9️⃣:客户端根据场景选择SocketChannel实现

    • 🔟:客户端使用handler方法为通道添加初始化器并指定通道初始化方法initChannel[该方法在与客户端建立流程后才会被调用]

    • 客户端使用connect方法连接服务端,服务端和客户端建立连接后,都会分别调用对应的通道初始化器的initChannel方法对通道进行初始化

    • 客户段使用channelFuture.sync方法阻塞当前线程等待与服务端成功建立连接

    • 与服务端建立连接后客户端使用channelFuture.channel方法获取连接对象channel

    • 客户端通过channel.writeAndFlush方法向服务端发送数据,在发送数据前客户端使用所有处理器对消息进行处理,比如使用处理器StringEncoder将字符串数据转换成ByteBuf对象

      • 客户端和服务端收发数据都要经过所有的Handler进行处理,注意有些Handler的特定方法只对某一种事件类型生效,比如自定义的channelInboundHandlerAdapter.channelRead()只会在通道触发读事件时才会被执行

    • 服务端的EventLoop监听到通道触发读事件并处理可读事件获取到ByteBuf,将ByteBuf经过所有预设的处理器进行处理比如使用处理器StringDecoderByteBuf对象转换成字符串,并且调用自定义的处理器的channelRead方法将此前处理器处理的结果打印出来

     

EventLoop

  1. 概念:事件循环对象,EventLoop本质上是一个维护着一个选择器Selector的单线程执行器[执行器的意思是用户可以向该线程池提交普通或者定时任务],线程的run方法不停处理选择器注册的所有通道源源不断的IO事件

    • EventLoop是一个接口,其父接口EventLoopGroup继承自JUC包下的ScheduledExecutorService,该父接口是JDK线程池中专门执行定时任务的线程池,因此EventLoop包含了线程池中的所有方法;此外EventLoop还继承了NettyOrderEventExecutor[有序的事件执行器]OrderEventExecutor继承自EventExecutor

      • boolean ---> orderEventExecutor.inEventLoop(executor())EventLoop提供判断指定线程是否属于当前eventLoop的能力,同名无参方法是判断当前线程是否属于当前eventLoop

      • orderEventExecutor.parent()提供判断当前eventLoop属于哪个eventLoopGroup的能力

      • EventLoop就是线程池,因此也能调用线程池的方法来提交普通任务和定时任务,示例如下

        [提交执行普通任务]

        [提交执行定时任务]

        • 定时任务也有作用,比如做keepalive可以实现连接的保活

    • 只要一个通道和eventLoop绑定,那么该通道上的所有数据IO操作都由该eventLoop处理,一个eventLoop可以管理多个通道上的读写或者接入事件

    • EventLoop的构造方法不太好单独使用,需要传参线程池和SelectorProvider等参数,一般通过实例化EventLoopGroup对象使用EventLoopGroup的默认配置实例化EventLoop会更方便

  2. EventLoopGroup

    • EventLoopGroup是事件循环组,EventLoopGroup管理着一组EventLoop,通过调用eventLoopGroup.registry方法将通道绑定到EventLoopGroup中的一个EventLoop对象上,为了保证IO事件处理时的线程安全此后该通道上的所有IO事件都只会由该EventLoop对象来专门处理

      • 事件循环组继承自NettyEventExecutorGroup,该接口继承自Iterable提供遍历EventLoop的能力并提供next方法获取集合中的下一个EventLoop对象

      • 常用实现类

        • NioEventLoopGroup:功能最全,既能处理通道上的IO事件,用户还能向事件循环组提交普通任务和定时任务

          • NioEventLoopGroup的无参构造方法默认会传参一个线程数nThreads=0,如果线程数为0会使用默认的线程数来作为事件循环组的线程数,如果传参的nThread不为0会以用户指定的线程数来初始化事件循环组;默认线程数在静态代码块中赋值且添加了final关键字,赋值的逻辑首先会读取Netty的一个系统参数io.netty.eventLoopThreads作为线程数,如果该参数没有值会使用NettyRuntime.availableProcessors()*2即本机两倍CPU核心数作为NioEventLoopGroup的线程数;而且最少会保证有一个线程,即基本上认为用户指定了线程数就用用户指定的,用户没有指定就设置为线程数乘以2

          • nioEventLoopGroup.next():依次获取nioEventLoopGroup中的事件循环对象,事件循环组中有多少个线程就有多少个事件循环对象,一遍轮询完了会再次从头开始循环获取;该方法能实现轮询所有事件循环对象,可以实现通道分配事件循环对象上的负载均衡;还可以通过该方法轮询事件循环组中的所有线程来负载均衡提交任务给线程来异步执行

        • DefaultEventLoopGroup:不能处理IO事件,只能处理用户提交的普通任务和定时任务

          • 网络编程也不全是IO操作,已经通过IO操作写入的数据也可能会执行一些普通的处理,该事件循环组主要是用来做异步或者定时任务处理

          • 该对象的用法除了不能处理IO事件和NioEventGroup的用法基本上是一样的

    • 常用方法

      • eventLoopGroup.shutdownGracefully():优雅地关闭事件循环组,现有的任务全部执行完,拒绝接收新的任务,现有任务执行完后会释放掉事件循环组中的所有线程并结束通信进程

  3. 细分EventLoop

    • 用户可以通过serverBootstrap.group(EventLoopGroup bossGroup,EventLoopGroup workerGroup)BossEventLoopWorkerEventLoop分开到两个事件循环组中来对处理不同事件的EventLoop进行细分,但是注意服务端始终只会创建一个ServerSocketChannel,因此bossGroup中也只会有一个EventLoop

    • 此外有些通道的数据量太大可能会存在一些耗时很长的工作,因为一个EventLoop只有一个线程,如果某个通道的任务耗时太长就会影响到其他通道事件的处理,我们可以专门创建一个DefaultEventGroup来处理流水线上非常耗时的非IO操作,通过nioSocketChannel.pipeLine().addLast(EventExecutorGroup group,String name,ChannelHandler handler)来指定执行该工序的EventLoopGroup,注意该流水线的工序对应一个通道也会和DefaultEventGroup中的一个EventLoop进行绑定

      • 同一条流水线不同eventloop之间的切换

        • 如果两个Handler绑定的是同一个EventLoop,当前线程会直接通过当前abstractChannelHandlerContext.invokeChannelRead()方法在同一个线程调用下一道工序

        • 如果两个Handler绑定的不是同一个EventLoop,当前线程会将下一道工序的abstractChannelHandlerContext.invokeChannelRead()方法封装成任务对象提交到下一道工序注册的EventLoop中执行

        [切换线程代码io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()]

        • 源码中很多地方都是这么做的,目的就是为了切换线程来执行一个流水线中的不同工序

         

 

 

Channel

  1. 常用方法

    • channel.close():关闭当前通道,该方法也是一个异步操作,是由其他线程去异步执行的,而且也是一个耗时操作

      • 注意该方法调用完全关闭一个连接通道后启动器所在的线程仍然不会自动结束,因为连接通道关闭只会解除被关闭通道和EventLoop的绑定关系,只有调用eventLoopGroup.shutdownGracefully()才会将事件循环组中的所有线程池全部释放,进程才会结束

    • channel.closeFuture():关闭当前通道后的善后处理

      • sync()的作用是同步等待当前channel的关闭

      • addListener的作用是异步等待channel的关闭

    • channel.pipeline():获取当前通道对应的流水线方便给流水线中添加Handler处理器

    • channel.write():将数据写出网络通道,由于Netty的缓冲机制,这些数据不一定会立即写出到网络通道

      • 可能先将数据缓冲着直到用户调用flush()方法才发送出去,也可能缓冲区达到一定大小了就自动发送出去,注意对面接收到数据是完整的缓冲区数据,即可能是多次调用channel.write()向缓冲区写入数据的拼接字符串

    • channel.writeAndFlush():将数据写出到网络通道并且立刻将缓冲区的数据刷出

    • channel.flush():将缓冲区的数据立刻发送出去

 

EmbeddedChannel

  1. 概念:Netty提供一个用于测试的Channel,可以通过构造方法给通道绑定流水线,无需启动客户端或者服务端就可以模拟向通道写入数据时数据经过入站处理器的过程,也可以模拟向通道写出数据时数据经过出站处理器的过程,EmbeddedChannel主要用来快速测试流水线

    • 构造方法:new EmbeddedChannel(@NotNull ChannelHandler... handlers),直接通过流水线创建Channel,使得用户可以不再通过初始化ServerBootstrap或者Bootstrap来创建channel

  2. 常用方法

    • embeddedChannel.writeInbound(ByteBuf byteBuf)ByteBufhead开始依次向后经过预设入站处理器

      • 一定要注意调用该方法如果是将ByteBuf切片分开发送,一定要在调用前将切片或者ByteBuf调用byteBuf.retain()方法让byteBuf的引用计数加1,否则其他切片还没有发送出去整个ByteBuf就会被释放掉

    • embeddedChannel.writeOutbound(ByteBuf byteBuf)ByteBuftail开始依次向前经过预设出站处理器

 

 

Future & Promise

  1. 概念

    • NettyFuture接口继承自JDK中的Future接口,Netty中的Promise接口继承自Netty中的Future

      • JDK中的Future只有一个future.get()方法同步阻塞当前线程等待异步任务执行结束并获取异步任务的执行结果

      • Netty中的Future既可以同步等待异步任务执行结束得到结果,也可以异步回调的方式执行任务得到结果;但是和JDK中的Future一样都要等到任务执行结束以后才能使用异步任务返回的结果

      • Netty中的Promise除了同步等待和异步回调等待异步任务的结果,还可以脱离任务独立存在,只是作为两个线程间传递结果的容器

  2. JDKFuture常用方法

    • future.cancel():取消已经提交但还未执行的任务

    • future.isCanceled():判断任务是否已经取消

    • future.isDone():判断任务是否已经完成,注意该方法不能判断任务是被成功执行了还是任务失败了

    • future.isSuccess():判断任务被成功执行了还是任务执行失败了

    • future.get():同步阻塞当前线程直到获取到异步任务的结果

    • future.whenComplete()

    [用法示例]

    • 由执行任务的线程创建并将结果放入Future容器返回给调用异步任务的线程

  3. Nettyio.netty.util.concurrent.FutureJDKFuture的扩展方法

    • future.getNow():获取异步任务的执行结果,如果结果还未产生返回null

    • future.sync():阻塞当前线程等待异步任务执行结束,如果异步任务执行失败会抛出异常

      • 注意这个异步任务会自动识别为向Promise设置结果的线程,只要Promise对象结果一设置上就会放行当前线程,用来协调一个线程等待另一个线程的执行结果

    • future.await():阻塞当前线程等待异步任务执行结束,如果异步任务执行失败不会抛出异常,需要用户通过手动调用future.isSuccess()方法来进行判断

      • 注意这个异步任务会自动识别为向Promise设置结果的线程,只要Promise对象结果一设置上就会放行当前线程,用来协调一个线程等待另一个线程的执行结果

    • future.cause():获取异步任务执行失败的错误信息,不会阻塞当前线程,如果没有发生异常就会返回null

    • future.addLinstener():不会阻塞当前线程,为异步任务添加异步回调方法,当异步任务执行结束会以异步的方式使用执行异步任务的线程在回调方法的入参中接受异步任务的执行结果并执行后续操作

    [用法示例]

    • 注意:JDK中的Future.get()可以放在Junit的测试方法中执行阻塞当前线程,异步任务也能正常执行;但是Nettyfuture.addListener()添加的异步回调无法在Junit的测试方法中使用异步线程执行,但是异步任务还是可以正常执行,Nettyfuture.addListener()可以在主方法中正常执行

  4. PromiseNetty中的Future的扩展方法

    • promise.setSuccess(T result):通过主动调用该方法来向promise对象中设置成功执行的结果,甚至不需要等到异步任务执行结束就能设置和获取结果

    • promise.setFailure(Throwable e):通过主动调用该方法来向promise对象中设置执行失败的异常

      • 不管调用promise.setSuccess(T result)还是调用promise.setFailure(Throwable e)都会让promise.await()或者promise.sync()结束等待继续向下运行

    • DefaultPromise()

    [用法示例1]

    • Promise是一个接口,常用实现有DefaultPromise(EventExecutor executor),用户可以直接在调用异步任务的线程通过构造方法传参EventLoop对象手动创建Promise线程,而不是只能通过执行异步任务的线程创建并返回给调用异步任务的线程

    • 在异步任务的调用线程中创建Promise对象,在执行异步任务的线程中将执行结果设置到Promise对象中,Promise只是可以手动设置异步任务的执行结果,promise.get()方法仍然会阻塞当前线程直到异步任务调用结束

    • Promise在网络编程的RPC框架中非常有用,实现Future达不到效果

    [用法示例2]

    • RPCClientManager中为用户提供接口动态代理对象的getProxyService()方法将用户调用远程方法的行为转换成向远程服务发送远程调用消息的行为[这部分细节内容看搭建RPC框架部分内容]

      • 这里演示使用的是JDK动态代理

      • 远程方法调用完响应消息被客户端接收后会被流水线处理成自定义的RPCResponseMessage,流水线的处理得到RPCResponseMessage是在线程eventLoop中完成的,动态代理对象一般在用户线程中完成,这里涉及到两个线程间共享同一个数据的问题,使用Promise容器可以实现多个线程间交换同一个数据

        • 准备一个ConcurrentHashMap以消息序号作为key,以Promise作为值缓存远程调用响应消息经流水线处理后的结果,为了保证多线程并发共享数据的线程安全性使用了concurrentHashMap<Integer,Promise<?>>[?是通配符,表示适配任意类型,这是因为不知道响应的结果是什么类型,注意这里用通配符不行,后续向Promise对象中设置值会出现问题,将通配符改成Object类型]Promise对象由代理对象通过DefaultPromise<?> promise = new DefaultPromise<>(channel.eventLoop())创建后存入ConcurrentHashMap[入参channel.eventLoop()是创建Promise对象需要指定将结果传入Promise对象的线程EventExecutor对象,需要流水线的执行线程即channel.eventLoop()对象,注意该concurrentHashMap老师设置为RPCResponseMessageHandler的一个公有静态变量],代理对象创建Promise对象并将其存入concurrentHashMap后调用promise.await()或者promise.sync()等待eventLoop接收到响应将结果存入promise对象,使用promise.await()使用promise.isSuccess()来判断是否正常成功获取消息,成功获取响应结果直接获取结果设置为代理对象对应方法的返回结果,如果没有成功获取响应结果,包装异常对象promise.cause()直接通过代理对象抛出

    • RPCResponseMessageHandler中增加将流水线处理结果存入concurrentHashMap中的Promise对象的逻辑

      • 通过消息的序列号从concurrentHashMap中获取消息对应的Promise对象,检查远程调用的响应结果是否正常,如果正常调用promise.setSuccess(returnValue)设置远程调用执行结果,如果远程调用有异常就调用promise.setFailure(exceptionValue)将异常信息设置到Promise[注意Gson对Throwable对象向json字符串的转换不需要自定义转换适配器],为了避免序列号错误或者其他错误需要对promise对象判空,只有非空才能进行设置值操作,否则会出现空指针异常,因为各种原因导致集合中没有对应promise对象是可能出现这种问题的

       

       

     

     

     

 

 

ChannelFuture

  1. 概念

    • 用于正确拿到连接通道对象

    • Netty中带有FuturePromise的类都是和异步相关的,一般都是用于正确获取异步线程的执行结果

      • Netty中很多的方法都是异步的,异步方法调用想要正确获取到方法执行结果不能想当然地在异步方法后直接编写获取异步方法执行的结果并做后续操作,必须使用channelFuture.sync()同步阻塞当前线程等待异步方法执行完毕正确返回结果或者channelFuture.addListener(ChannelFutureListener channelFutureListener)异步回调等异步方法执行完毕自动调用的方式来使用异步方法的结果执行后续操作

  2. 常用方法

    • channelFuture.sync():该方法是一个阻塞方法,会阻塞当前线程直到与服务端的连接成功建立后才会继续执行后续代码

      • 如果没有channelFuture.sync()这行代码,调用bootstrap.connect()的线程和执行bootstrap.connect()的线程的线程不是同一个线程,如果在连接还没有建立完就通过channelFuture.channel()获取到Channel并使用该channel向服务端发送消息,消息就会因为连接还没有成功建立直接导致消息丢失;但是只是建立连接过程中发出的消息会直接丢失,连接建立好以后,连接信息会自动封装到channel中且能正常收发消息;

      • 核心是创建启动器的线程异步非阻塞使用EventLoop和服务端建立连接,连接成功建立前在任何线程使用channel收发的消息都会直接丢失,连接建立后连接信息会直接写入channel,此后消息就能正常收发

      • 该方法的优势是通过连接成功建立后的回调来通知当前线程channel可用避免了长时间无意义的等待,该方法的实现可以参考一下JUC里面的保护性暂停

    • channelFuture.channel():连接成功建立后获取连接通道channel对象

    • channelFuture.addListener(ChannelFutureListener channelFutureListener):在客户端与服务端的通道建立完成后自动异步使用其他线程回调channelFutureListener.operationComplete(ChannelFuture future)方法

      • ChannelFutureListener是一个函数式接口,用户通过匿名实现该接口重写其中的operationComplete(ChannelFuture future)方法来指定建立连接后要进一步执行的操作,注意ChannelFutureListener是对应返回值为ChannelFuture的异步方法如Bootstrap.connect();如果异步任务的返回值不是ChannelFuture比如eventLoop.submit(Callable callable)可以使用future.addListener(GenericListener genericListener)

      • 回调方法中的入参ChannelFuture对象就是channelFuture.addListener()中的channelFuture

         

 

CloseFuture

  1. 概念:在连接通道完全关闭以后要执行的操作,ChannelFuture对象可以通过channel.closeFuture()对象获取

    • CloseFuture的实现接口是ChannelFuture

  2. 常用方法

    • closeFuture.sync()

      • 同步模式等待调用channel.close()致使连接通道完全关闭实现阻塞当前线程在通道完全关闭后执行善后处理

    • closeFuture.addListener(ChannelFutureListener channelFutureListener):在客户端与服务端的通道完全关闭后自动异步使用关闭通道的线程即通道对应的EventLoop回调channelFutureListener.operationComplete(ChannelFuture future)方法执行通道完全关闭后的善后操作,ChannelFutureListener是一个接口,用户通过匿名实现该接口重写其中的operationComplete(ChannelFuture future)方法来指定连接通道完全关闭后要进一步执行的操作

 

 

 

Handler & Pipeline

  1. ChannelHandler

    • 概念:ChannelHandler用于处理通道Channel上的各种事件,处理器分为入站出站处理器,所有处理器连成一串就成为了Pipeline,原材料ByteBuf进入流水线,经过流水线上的各道处理器工序加工最后变成产品

    • 入站处理器通常是ChannelInboundHandlerAdapter的子类,用来读取客户端数据以及向客户端写出数据[不一定啊,我看老师的演示也没有这个要求,只是处理一段操作]

      • 入站处理器主要重写channelRead(ctx,msg)方法

        • 注意流水线的工序中也可以调用初始化通道的initChannel(NioSocketChannel channel)方法入参中的channel.writeAndFlush(ctx.alloc().buffer().writeBytes("发送给客户端的数据".getBytes()))向指定通道中写出数据,流水线不会自动将写出到通道的字符串转换成ByteBuf对象,channelHandlerContext.alloc().buffer()是获取一个ByteBuf对象,通过byteBuf.writeBytes(Byte[] bytes)将字符串转换成ByteBuf对象

        • 入站处理器只会在数据从通道写入客户端或者服务端时才会从head向后依次触发

        • 入站处理器需要调用super.channelRead(ctx,msg)才能当前Handler的处理结果传递并将控制权交给下一个入站处理器,最后一个入站处理器调用该方法没有意义;

          • 这个方法内部只有一行代码channelHandlerContext.fireChannelRead(msg),该方法可以在自定义ChannelInboundHandlerAdapter中直接调用将当前处理器的处理结果通过传递给下一个Handlersuper.channelRead(ctx,msg)Object类型的msg就是当前处理器要向下一个处理器传递的数据,流水线最初的msgByteBuf,我们可以在工序处理后将msg替换成处理器对上一道工序结果处理后的结果传递给下一道工序

          • 入站处理器中只要不调用该super.channelRead(ctx,msg)或者channelHandlerContext.fireChannelRead(msg)方法,入站处理器流水线就会在当前处理器直接断掉,后续的入站处理器就不会再执行

        • 如果入站处理器处理数据期间有写出数据的操作,会从当前处理器直接向前依次执行出站处理器,当前处理器后的所有入站和出站处理器都不会执行

      • 入站处理器主要重写channelActive(ChannelHandlerContext ctx)方法

        • 连接通道成功建立后会触发active事件,此时会执行处理器中的channelActive(ctx)方法,这个方法和channelFuture.sync()的作用很相似

      • 重写channelInactive(channelHandlerContext)方法,客户端正常退出情况下从会话管理器中移除用户和对应的连接通道

      • 重写exceptionCaught(channelHandlerContext,Throwable cause)方法,客户端异常退出情况下从会话管理器中移除用户和对应的连接通道

    • 出站处理器通常是ChannelOutboundHandlerAdapter的子类,用来对写入服务端的数据进行加工,

      • 出站处理器主要重写write(ctx,msg,promise)方法

        • 出站处理器只有在服务端或者客户端向Channel中写入数据时才会从tail往前依次触发

        • 出站处理器需要调用super.write(ctx,msg,promise)才能将数据传递给下一个出站处理器

    • Netty中自定义双向处理器可以使用ChannelDuplexHandler的匿名实现,既可以作为入站处理器,也可以作为出站处理器

      • channelDuplexHandler.userEventTriggered(channelHandlerContext ctx,object evt)只有用户自定义的事件或者IdleState下的各种事件才会触发,其中入参Object evt就是事件本身,注意IdleState#READER_IDLE事件的类型是IdleStateEvent

    • 注意帧解码器最好不要设置成多个EventLoop共用,即把同一个处理器对象设置到多个流水线中,应该每个流水线都创建一个独立的帧解码器实例,因为帧解码器不是线程安全的,如果作为共享资源被多个线程共享,就可能出现多个EventLoop的消息混在一起的情况,特别是像帧解码器这种处理半包现象会阻塞等待后续消息的处理器如果被多个线程共享极容易发生其他线程拼接消息到帧解码器上一条还暂存的半包消息,导致两条消息都完全错乱,像帧解码器这种只要能记录多条消息状态的处理器就不能将同一个处理器实例加到多个流水线中

      • Netty为所有支持多线程共享的处理器添加了@Sharable注解,这些处理器Netty充分考虑了其各种场景下的线程安全性,可以添加到不同的流水线中供多个eventLoop共享

      • 没加Sharable注解的处理器可能存在一段时间内记录多条消息的状态,可能出现线程不安全的风险,一般解码器都不是线程安全的

  2. ChannelHandlerContext

    • 常用方法

      • channelHandlerContext.fireChannelRead(msg):将当前处理器的处理结果传递给下一个处理器并将控制权交给下一个处理器

      • channelHandlerContext.writeAndFlush(channelHandlerContext.alloc().buffer().writeBytes("执行输出操作")):从当前处理器开始依次向前遍历执行每个出站处理器,注意channel.writeAndFlush(channelHandlerContext.alloc().buffer().writeBytes("执行输出操作"))是从tail处理器开始依次向前遍历执行每个出站处理器

        • 注意按照此前的客户端启动器为Bootstrap,且NioEventLoopGroup中的EventLoop全部通过bootstrap.channel(NioSocketChannel.class)设置为WorkerEventLoop无法接收到服务端通过channelHandlerContext.writeAndFlush(channelHandlerContext.alloc().buffer().writeBytes("执行输出操作"))channel.writeAndFlush(channelHandlerContext.alloc().buffer().writeBytes("执行输出操作"))写出的数据

  3. Pipeline流水线

    • 概念:流水线底层实际上是一个双向链表,首尾分别是headtail,中间是用户自己组合的Netty提供的处理器或者自定义处理器,可以从head向后连,也可以从tail往前连

    • 常用方法

      • ChannelPipeline ---> channel.pipeline():使用通道获取流水线对象

      • pipeline.addLast(Handler handler):将处理器添加到流水线的尾部

        • Netty会自动给流水线头部添加一个head处理器,给流水线尾部添加一个tail处理器,addLast方法将处理器添加到流水线尾部实际上是添加到tail前面

      • pipeline.addLast(String name,Handler handler):自定义处理器的名字并将处理器添加到流水线尾部

 

 

ByteBuf

  1. 概念:

    • ByteBuf扩容:ByteBuf的容量支持动态扩缩容,ByteBuf可扩容的最大容量是Integer.MAX,写入的数据量超过ByteBuf的当前容量会自动扩容,扩容规则如下

      • 1️⃣:如果写入后的数据大小不超过512,会扩容到当前容量的下一个16的整数倍

      • 2️⃣:如果写入后的数据大小超过512,会扩容到当前容量的下一个2n

      • 3️⃣:扩容后的容量不能超过Integer.MAX,超过最大容量会报错,即容量在512以前按照16的步长扩容,容量超过512每次按照原容量的两倍扩容

    • byteBuf.toString()的基本格式:PooledUnsafeDirectByteBuf(ridx: 0,widx: 0,cap: 256)

      • ridxwidx是读写指针,capByteBuf的总容量,读写指针在ByteBuf创建以后都在索引0处;

        • 容量到最大容量Integer.MAX之间是可扩容部分

        • 写指针到容量之间的部分表示可写区域

        • 读指针到写指针之间的部分表示可读区域

        • 索引0到读指针之间的部分表示废弃区域

        • NIO中的ByteBuffer读写共用一个指针,读写需要使用byteBuffer.flip()切换为读模式,使用byteBuffer.clear()或者byteBuffer.compact()切换为写模式;Netty中的ByteBuf读和写设置为两个指针,ByteBuf的读和写不需要切换读写模式

    • ByteBuf实例化

      • 通过方法channelHandlerContext.alloc().buffer()可以获取一个ByteBuf对象

      • 通过方法ByteBuf bytebuf=ByteBufAllocator.DEFAULT.buffer()可以获取一个ByteBuf对象,不指定入参会采用默认容量256B作为缓冲区容量;也可以通过ByteBuf bytebuf=ByteBufAllocator.DEFAULT.buffer(int initialCapacity)指定缓冲区的初始容量

        • 以上方法创建的都是基于直接内存的ByteBuf,也可以通过ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer()来创建基于直接内存的ByteBuf,效果是一样的

        • 这种方式创建ByteBuf对象最好只在自己做测试时使用,实际生产ByteBuf对象的创建都是放在处理器Handler中,此时建议使用channelHandlerContext.alloc().buffer()来实例化ByteBuf对象

      • 通过方法ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer()可以创建基于堆内存的ByteBuf

    • 直观打印ByteBuf的工具方法

      • 该工具方法只会关注读指针到写指针部分的内容,读取过的废弃部分不会打印输出

      [打印效果]

    • ByteBuf相较于ByteBuffer的优势

      • 采用池化机制使得程序可以重用池中的ByteBuf实例,降低ByteBuf对象创建和销毁或者GC的开销,节约系统内存避免ByteBuf对象过多导致内存溢出的可能

      • 读写指针分离,不需要像ByteBuffer一样切换读写模式

      • 支持自动扩容

      • 方法设计上支持链式调用,使用起来更流畅

      • 在切片slice、复制duplicate和组合多个ByteBufCompositeByteBuf以及Unpooled类中组合多个ByteBuf对象或者字节数组的方法中都对零拷贝思想进行了实现,减少数据复制提高ByteBuf切分、复制和组合操作的性能

  2. 常用方法

    • ByteBuf ---> byteBuf.toString(Charset Charset.defaultCharset()):将字节缓冲区的数据按照指定编码格式转成字符串,实际生产一般都在客户端和服务端指定相同的字符集而不是使用默认的字符集,因为客户端和服务端所在机器的默认字符集可能不一致,非常容易乱码

    • ByteBuf ---> byteBuf.writeBytes(Byte[] bytes):将字节数组转换成ByteBuf对象

    • ByteBuf ---> byteBuf.writeBoolean(boolean value):将boolean值以字节写入ByteBuf01表示true00表示false

    • ByteBuf ---> byteBuf.writeByte(int value):向ByteBuf写入一个字节

    • ByteBuf ---> byteBuf.writeShort(int value):向ByteBuf写入一个short

    • ByteBuf ---> byteBuf.writeInt(int value):向ByteBuf写入一个int值,默认是大端写入Big Endian

      • 大端写入是按照int占有的存储空间从高位到低位依次将value值对应的二进制码以一个字节为一个整体从高位写到低位,比如value值对应16进制0x250,大端写入int类型存储空间对应00 00 02 50

      • 注意向ByteBuf中写入int类型数据会直接占用四个字节的存储空间

      • 一般网络编程都会采用大端写入

    • ByteBuf ---> byteBuf.writeIntLE(int value):向ByteBuf小端写入一个int

      • 小端写入是按照int占有的存储空间从高位到低位依次将value值对应的二进制码以一个字节为一个整体从低位写到高位,比如value值对应16进制0x250,小端写入int类型存储空间对应50 02 00 00

    • ByteBuf ---> byteBuf.writeLong(long value):向ByteBuf写入一个long

    • ByteBuf ---> byteBuf.writeChar(int value):向ByteBuf写入一个char字符

    • ByteBuf ---> byteBuf.writeFloat(float value):向ByteBuf写入一个float

    • ByteBuf ---> byteBuf.writeDouble(double value):向ByteBuf写入一个double

    • ByteBuf ---> byteBuf.writeBytes(ByteBuf src):向ByteBuf中写入一个byteBuf

    • ByteBuf ---> byteBuf.writeBytes(byte[] src):向ByteBuf中写入一个byte数组

    • ByteBuf ---> byteBuf.writeBytes(ByteBuffer src):向ByteBuf中写入NIOByteBuffer

    • int ---> byteBuf.writeCharSequence(CharSequence sequence,Charset charset):向ByteBuf中写入字符串并指定字符集

    • 🔎ByteBuf中一系列以set开头的方法也可以写入数据但是不会改变写指针的位置

    • byteBuf.setByte(int index,byte value):将指定索引处的字节数据更改成指定值

    • byte ---> byteBuf.readByte():从ByteBuf中读取一个字节

    • byteBuf.readBytes(byte[] bytes,int dstIndex,int length):将ByteBuf中当前位置指定偏移量开始读取指定长度指定长度的数据到字节数组bytes

    • byteBuf.markReaderIndex():对当前读指针的位置做标记

    • byteBuf.resetReaderIndex():将当前读指针的位置还原回此前对读指针标记的位置

    • byteBuf.readInt():下一次从ByteBuf中读取一个int类型的数据

    • 🔎ByteBuf中有一系列以get开头的方法也可以读取数据但是不会改变读指针的位置

    • ByteBuf ---> byteBuf.silce(int index,int length)index是从哪个索引开始切片,length是当前切片的长度,返回当前切片,当前切片有完全独立的切片

      • 获取的ByteBuf的写指针和容量对应的索引相同

      • 切片后的ByteBuf不能再进行扩容写入超出容量的数据,会直接抛出异常IndexOutOfBoundsException,这是为了避免切片扩容写数据导致原始ByteBuf的数据发生错乱

    • int ---> byteBuf.readableBytes():获取当前ByteBuf中剩余的可读取的字节长度

  3. ByteBuf的内存模式

    • ByteBuf也可以使用直接内存或者堆内存,ByteBuf默认使用的由ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer()或者ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer()来创建基于直接内存的ByteBuf,用户也可以使用ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer()来创建基于堆内存的ByteBuf

      • 直接内存分配效率低[创建和销毁的代价昂贵],读写效率高[读写效率高的原因是直接内存可以映射到用户缓冲区,减少一次数据从系统缓冲区拷贝到用户缓冲区的过程],对用户应用程序造成的GC压力小[直接内存不受JVM垃圾回收器的管理]

      • 堆内存分配效率高,读写效率低

  4. ByteBuf的池化管理

    • 为了重用ByteBuf从而对ByteBuf采用了池化技术,而且采用了与jemalloc算法类似的内存分配算法提高分配效率,避免每次使用ByteBuf都去创建新的ByteBuf实例,基于直接内存的ByteBuf创建销毁开销又大,基于堆内存的ByteBuf也会给GC带来压力;高并发情况下,ByteBuf池化会更节约内存,降低内存溢出的可能

    • Netty4.1以后非Android平台默认就开启了ByteBuf的池化功能,Android平台默认使用的是ByteBuf的非池化实现;Netty4.1以前池化技术还不完善,非Android平台默认也使用的是ByteBuf的非池化实现

    • 采用池化技术的ByteBuf的实现类的类名上会以Pooled字样打头,比如PooledUnsafeDirectByteBufPooledUnsafeHeapByteBuf;关闭池化功能的实现类的类名会以Unpooled字样打头,比如UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBufUnpooledByteBufAllocator$InstrumentedUnpooledUnsafeNoCleanerDiretByteBuf

    • 如果要设置关闭或者开启ByteBuf的池化功能可以在启动程序的时候通过虚拟机参数-Dio.netty.allocator.type={unpooled|pooled}[即关闭ByteBuf的池化功能可以设置虚拟机参数-Dio.netty.allocator.type=unpooled]

  5. ByteBuf的内存回收

    • 内存回收方式

      • 使用JVM堆内存的非池化ByteBuf实现如UnpooledHeapByteBuf等待GC回收内存即可

      • 使用直接内存的非池化ByteBuf实现如UnpooledDirectByteBuf也可以通过定时的GC回收内存,但是回收不及时,建议调用特定的方法来手动回收内存

      • 使用了池化机制的ByteBuf实现如PooledByteBuf及其子类回收内存不是销毁ByteBuf对象,而是将ByteBuf对象还回ByteBuf池来重用ByteBuf,需要更复杂的内存回收规则

    • 每个ByteBuf实现都实现了ReferenceCounted接口,通过引用计数法来控制ByteBuf对象的回收,基本设计如下

      • 每个ByteBuf对象的初始引用计数均为1;调用byteBuf.release()方法会让引用计数减1,如果引用计数减为0ByteBuf对象就会被回收;调用byteBuf.retain()方法会让引用计数加1,保证当前调用者没有byteBuf.release()释放掉对ByteBuf对象的引用前ByteBuf对象不会被误回收

      • ByteBuf对象的引用计数变成0时,底层内存会被回收,此时即使ByteBuf对象还在,其中的方法也无法正常使用了,真正释放ByteBuf对象内存的方法是protect abstract void deallocate(),该抽象方法对应不同的ByteBuf实现类有对应的内存回收实现,比如释放直接内存、释放堆内存以及将ByteBuf回收到ByteBuf池中

    • 释放ByteBuf的时机

      • 释放ByteBuf的时机应该是哪个Handler最后使用ByteBufByteBuf对象就应该由对应的Handlerfinally块中调用byteBuf.release()释放

      • 流水线中的tailhead两个Handler中也会释放ByteBuf;当执行入站处理器时,执行到tail如果ByteBuf对象还存在就会在tail处理器中释放ByteBuf对象;执行出站处理器时,如果执行到headByteBuf对象还存在就会在head处理器中释放ByteBuf对象

        • 注意headtail只有被传递的数据是ByteBuf时才会主动释放,如果中间工序将ByteBuf处理后传递给后续处理器的数据不是ByteBuf对象,headtail处理器就不会主动对流水线初始的ByteBuf对象进行释放;因此最保险的方法还是在哪个处理器最后使用了ByteBuf对象就在那个Handlerfinally块中对ByteBuf对象进行释放

    • tail处理器销毁ByteBuf源码分析

    • head处理器销毁ByteBuf源码分析

  6. silce()切片方法[零拷贝实现]

    • Netty中的byteBuf.silce()是零拷贝的体现之一

      • NIO中的零拷贝是指文件通道向socket通道传输数据时可以不经过用户缓冲区,直接从系统缓存区拷贝到网卡,通过减少数据复制次数来提高数据IO性能

      • 将原始的ByteBuf进行切片形成多个ByteBuf,切片后所有ByteBuf整体还是原始ByteBuf的内存,只是每个ByteBuf都维护独立的readwrite读写指针;这个过程没有发生任何数据的复制

      • ByteBuf ---> byteBuf.silce(int index,int length)index是从哪个索引开始切片,length是当前切片的长度,返回当前切片,当前切片有完全独立的读写指针和容量,但是数据的内存地址和原始ByteBuf是同一块内存地址

        • 获取的ByteBuf的写指针和容量对应的索引相同

        • 切片后的ByteBuf不能再进行扩容写入超出容量的数据[即ByteBuf的最大容量就是切片长度],会直接抛出异常IndexOutOfBoundsException,这是为了避免切片扩容写数据导致原始ByteBuf的数据发生错乱

      • 原始ByteBuf如果调用byteBuf.release()被释放掉了,切片byteBuf中的方法也无法正常使用了,会抛出IllegalReferenceCountException;但是如果在原始ByteBuf调用byteBuf.release()前就对原始ByteBuf或者任意一个切片调用byteBuf.retain()让引用计数加1,不管是原始ByteBuf还是任意一个切片都不会被销毁,都能被正常使用

        • 一般获取到切片都会手动调用切片byteBuf.retain()来避免原始ByteBuf被误删,并且需要在使用完切片以后手动调用byteBuf.release()来释放byteBuf

  7. duplicate()复制[零拷贝实现]

    • byteBuf.duplicate()也是零拷贝的体现之一

      • duplicate()截取了原始ByteBuf的所有内容,与原始ByteBuf的字节数据共用同一块物理内存,只是读写指针相对于原始ByteBuf是完全独立的,且没有最大容量的限制

  8. copy()拷贝

    • byteBuf.copy()会将原始ByteBuf的数据深拷贝复制一份到新的ByteBuf对象中,无论如何读写新的ByteBuf对象都与原始ByteBuf无关

 

 

CompositeByteBuf

  1. 概念[零拷贝实现]

    • 利用CompositeByteBuf中的方法组合多个ByteBuf为一个ByteBuf,这个过程不会对被组合ByteBuf中的字节数据进行内存复制,相应的维护读写指针之间的关系比较复杂;像byteBuf.writeBytes(byteBuf1).writeBytes(byteBuf2)这种组合多个ByteBuf的方式会将byteBuf1byteBuf2中的字节数据拷贝一份复制到byteBuf对象中

    • 实例化CompositByteBuf对象,通过CompositeByteBuf buffer=ByteBufAllocator.DEFAULT.compositeBuffer()

  2. 常用方法

    • compositByteBuf.addComponents(ByteBuf... buffers):一次性向compositByteBuf中添加多个ByteBuf

      • 注意默认情况下compositByteBuf.addComponents(ByteBuf... buffers)compositByteBuf.addComponent(ByteBuf buffer)方法都不会自动调整写指针位置[默认写指针一直为0,写指针和读指针都为0无法读出数据],容量会自动变化为合并后ByteBuf的容量

    • compositByteBuf.addComponent(ByteBuf buffer):一次性向compositByteBuf中添加一个ByteBuf

    • compositByteBuf.addComponents(boolean increaseWriterIndex,ByteBuf... buffers):一次性向compositByteBuf中添加多个ByteBuf并自动调整写指针位置到最后一个字节

    • compositByteBuf.addComponent(boolean increaseWriterIndex,ByteBuf buffer):一次性向compositByteBuf中添加一个ByteBuf并自动调整写指针位置到最后一个字节

 

Unpooled

  1. 概念

    • Unpooled是一个工具类,提供非池化的ByteBuf的创建、组合和复制操作

  2. 常用方法

    • ByteBuf byteBuf ---> Unpooled.wrappedBuffer(ByteBuf... buffers):将多个ByteBuf对象按顺序依次组合成一个ByteBuf,该方法组合的ByteBuf数量超过一个时,底层使用了CompositeByteBuf的零拷贝API来组合多个ByteBuf,因此该方法组合多个ByteBuf底层不会有拷贝操作

    • ByteBuf byteBuf ---> Unpooled.wrappedBuffer(byte[]... byteArrs):将多个字节数组按顺序依次组合成一个ByteBuf,底层也不会有字节数据拷贝操作

    • ByteBuf byteBuf ---> Unpooled.copiedBuffer(byte[] bytes):将字节数组作为数据内容创建ByteBuf对象

 

黏包半包

黏包

  1. 黏包现象

    • Netty中进行网络通信就会发生黏包现象,这种黏包现象发生是随机的[有时候不会发生,有时候会发生],示例如下

      • 不管通道类型是new ChannelInitializer<NioSocketChannel>(){}还是new ChannelInitializer<SocketChannel>(){},不管是使用channel.writeAndFlush(数据);还是使用hannelHandlerContext.writeAndFlush(数据),经过测试都可能随机发生黏包现象

      • 特别注意:打了断点在参数界面使用变量执行一段额外的发送数据操作不会发生黏包现象,而且默认情况下没有发生半包现象

    [服务端代码]

    [客户端代码]

    [服务端黏包现象]

    • 换了NioSocketChannelSocketChanel以及channel.writeAndFlush(数据)ChannelHandlerContext.writeAndFlush(数据)发送数据都会发生黏包现象

    • 但是发送了很多次数据以及不同类型的数据默认情况下没有出现半包现象

     

 

半包

  1. 半包现象

    • Netty服务端的接收缓冲区相对于传输数据较小时就会很容易发生半包现象,比如手动将Netty的服务端接收缓冲区调整成10字节

    [服务端代码]

    [客户端代码]

    [服务端半包现象]

    • 同时发生黏包半包现象,但是数据总量是准确的

     

原因分析



 

  1. TCP的滑动窗口引起粘包半包现象

    • TCP请求为了保证网络数据传输的可靠性,网络传输数据以段segment为单位,一次完整的数据可能被分成多个段来发送,每个段都需要接收方进行一次应答ack确认,如果接收方没有应答还会再次重试发送保证消息的可靠抵达,等到一个段的发送和应答抵达完成后才发送下一个段,这种方式会降低系统的吞吐量,包的往返时间越长性能越差

    • 为了解决这个问题,TCP中引入了滑动窗口,滑动窗口大小决定了无需等待应答就可以继续发送的段数量的最大值,滑动窗口内的段数据才允许被发送,在接收方应答未到达前窗口停止滑动,只有滑动窗口中的数据的接收方应答回来了窗口才能向后滑动包含等待发送的段并发送段数据,而且接收方也相应地维护着一个滑动窗口,超出滑动窗口的待接收数据只有等滑动窗口内的数据接收完毕后才能执行接收操作

      • 滑动窗口起到了缓冲区的作用,避免数据发送的太快有多少数据发送多少数据,也避免必须发送完一条数据等到接收方相应以后再发送下一条数据导致发送太慢,同时也起到了流量控制作用

      • 因为TCP数据可能被分割成多个段,但是滑动窗口的大小有限;如果接收方网络数据量太大,数据接收到一半滑动窗口缓冲区不够用,接收方此时必须去缓冲区读取数据,就会发生半包现象;如果接收方的滑动窗口比较空闲,客户端发送了多条数据,接收方没有及时去滑动窗口读取数据,接收方的滑动窗口把多条数据都缓冲在滑动窗口中,此时接收方程序去缓冲区读取数据,就会发生黏包现象

    • TCP滑动窗口对应的底层实现就是TCP的接收缓冲区和发送缓冲区

  2. TCPNagle算法引起的粘包现象

    • 概念:TCP网络数据传输在传输层和IP层为数据添加报头,IP层的报头为20个字节,TCP层的报头也为20个字节,即使只传输一个字节数据,最后添加了报头的数据长度至少为41个字节,在某些情况下报头的长度远远大于数据内容的长度,这样非常不划算;因此出现了Nagle算法对这种情况进行优化,Nagle算法的原理是攒够了一定量的数据再发送,避免因为数据报头远远多于数据内容导致网络传输效率太低

      • 攒数据就带来了粘包问题

  3. NettyByteBuf设置的太大导致粘包现象

    • NettyByteBuf默认大小为1024字节,这个容量是比较大的,如果网络传输的都是很小的数据,很容易就会产生粘包现象

  4. NettyByteBuf设置的太小导致半包现象

    • NettyByteBuf容量小于通道中实际发送的数据量,需要分多次读取就会产生半包现象

  5. 链路层的MSS限制导致半包现象

    • 链路层是比TCP更底层的协议,链路层有一个MSS限制,不同的网卡对数据包的大小有限制,笔记本的网卡一般限制数据包大小为1500字节[包含TCP和IP报头40字节],只要发送的数据量大于1460字节,数据就会被切分发送,此时就会造成半包现象

      • 注意回环地址对MSS几乎没有限制,允许的数据包大小为65535字节,如果向局域网中的另外一台电脑数据网络传输就会应用MSS限制

      • 网卡对数据包的大小限制值叫MTU[数据链路层最大载荷长度]MSS是传输层的报文载荷长度[不包含报头40字节]MTU包含了MSSMTU减去IP头和TCP头就是MSS,当数据包大小超过了MTU时,路由器可能会把数据包分成更小的部分发送,如果不能分片,路由器会丢掉这个数据包,发送ICMP报文,告诉发送方数据包太大

解决方案

短链接

  1. 概念:

    • 客户端发送完一条消息立即把链接断开,客户端单次链接只会发送一条消息,NIO中客户端调用socketChannel.close()断开连接会触发服务端的可读事件,但是可读事件不会读取到任何内容并且返回值为-1Netty对这部分的处理方式不清楚;但是不管是NIO还是Netty一定能监听客户端的断开状态,服务端也能根据客户端的断开时机来确定单条消息的边界

      • 短链接本质上是人为使用客户端建立连接作为消息起始,客户端断开连接作为消息结束,使用客户端连接状态来标记消息的开始和结束,从而从解决TCP协议网络数据无边界的问题来解决粘包半包问题

    • 客户端需要发送消息时,我们调用bootstrap.connect()建立连接,单条消息发送完毕后我们在处理器中调用socketChannel.close()关闭连接

      • 注意调用了socketChannel.close()正常关闭客户端连接服务端不会抛异常,而且服务端无需对客户端正常关闭做任何处理

      • 经过测试,使用短链接方案客户端与服务端之间的数据通信不会出现粘包现象

      • 只要通道关闭重新建立连接即使是同一台机器也是全新的通道,可以被服务端负载均衡到不同的EventLoopGroup,这意味着短链接情况下服务端可以不管客户端的消息发送情况并发地接收同一台物理机器上的连续消息

  2. 局限

    • 短链接方案无法解决半包问题,TCP的接收缓冲区滑动窗口容量太小、ByteBuf的容量太小或者MSS限制仍然会产生半包现象

    • 每发一条数据都要建立一次连接,性能和效率比较低

 

定长解码器

  1. FixedLengthFrameDeacoder

    • 概念:Frame表示帧,指一条完整的消息,Decoder表示解码器是专门做消息解码的特定一类HandlerFixedLength的意思是固定长度;FixedLengthFrameDeacoder解码器是专门做固定长度消息解码的处理器

      • 注意固定长度消息是指服务端和客户端规定网络传输的每条消息的长度都是一样的,不管ByteBuf中的数据是否出现粘包或者半包现象,定长解码器都会按照固定的字节数划分单条消息并对单条消息解码,出现半包现象会等待后续消息并自动拼接出一条消息,遇到粘包现象会自动拆分出多条消息

      • 实际的消息很难每次保证长度一致,FixedLengthFrameDeacoder要求定长消息没有数据的位置不能空着,需要手动指定标识符表示空白消息,FixedLengthFrameDeacoder设置的定长消息长度需要事先找到可能发送的所有消息中长度最长的消息作为定长消息的长度,否则总会有消息无法被完整拼接到定长消息解码器中

      • FixedLengthFrameDeacoder对象可以通过new FixedFrameDecoder(10)实例化并指定定长消息的长度

      • FixedLengthFrameDeacoder以处理器Handler的方式加入到流水线中,使用方法和Netty的固有处理器是一样的,如socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(10))

        • FixedLengthFrameDeacoder需要添加到LoggingHandler之前,否则ByteBuf的数据还没有经过定长解码器处理粘包半包现象就被LoggingHandler打印到控制台了

        • 从测试现象来说,FixedLengthFrameDeacoder会拆分或者拼接出单条消息后才会让每一个单条消息去执行后续的流水线操作,不够单条消息就会阻塞流水线的执行

  2. 优势

    • 客户端可以一次性将任意形式的数据发送给服务端,粘包半包发都无所谓,消息发多少条也无所谓,反正服务端会自动将每条消息都处理出来,相较于短链接每次连接都断开效率要高得多

  3. 局限

    • 定长解码器在消息长度方差大的情况下会造成严重的内存浪费,无意义消耗网络带宽,因为每条消息都会发送最长消息的长度的字节,很多字节都是无意义的数据内容

 

行解码器



 

  1. LineBasedFrameDecoder

    • 概念:以换行符作为单条消息的分割符,支持换行符\n[linux平台的换行符]\r\n[windows平台的换行符],只会分割消息并将单条消息分发到后续流水线,不会打印单条消息内容

      • LineBasedFrameDecoder对象可以通过new LineBasedFrameDecoder(final int maxLength)来实例化,创建时必须指定行解码器的最大长度,如果解码单条消息时发现消息超过指定的最大长度还没有发现换行符,就会抛出TooLongFrameException消息太长异常,这是为了避免客户端发送的消息格式错误导致行解码器一直等待拼接消息

      • LineBasedFrameDecoder以处理器Handler的方式加入到流水线中,使用方法和Netty的固有处理器是一样的,如socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024))

      • 换行符\nloggingHandler中字符显示为.

      • 从测试现象来说,LineBasedFrameDecoder会拆分或者拼接出单条消息后才会让每一个单条消息去执行后续的流水线操作,不够单条消息就会阻塞流水线的执行

    • 局限:行解码器的效率比较低,因为多了一次遍历找分隔符确定消息长度创建对应ByteBuf的过程

  2. DelimiterBasedFrameDecoder

    • 概念:该行解码器使用用户自定义的分隔符作为单条消息的分割边界,只会分割消息并将单条消息分发到后续流水线,不会打印单条消息内容

      • DelimiterBasedFrameDecoder对象可以通过new DelimiterBasedFrameDecoder(int maxFrameLength,ByteBuf delimiter),创建时必须指定行解码器的最大长度并指定ByteBuf类型的分割符delimiter

      • DelimiterBasedFrameDecoder以处理器Handler的方式加入到流水线中,使用方法和Netty的固有处理器是一样的,如socketChannel.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(2048,Unpooled.copiedBuffer("\t".getBytes())))

      • 从测试现象来说,LineBasedFrameDecoder会拆分或者拼接出单条消息后才会让每一个单条消息去执行后续的流水线操作,不够单条消息就会阻塞流水线的执行

    • 局限:行解码器的效率比较低,因为多了一次遍历找分隔符确定消息长度创建对应ByteBuf的过程

LTC解码器

  1. LengthFieldBasedFrameDecoder[也叫帧解码器]

    • 概念:基于长度字段的消息解码器,该解码器要求传输的消息分成四个部分,第一个部分是模数字段,可以携带消息的其他信息[这部分长度可以为0];第二个部分是消息的长度数据即消息长度字段;第三个部分是消息附带内容[这部分长度可以为0];第四个部分是消息内容本身,即为了避免识别分割标识符来确定单条消息的长度会多出一次遍历导致性能较低直接在客户端发送消息时就通过消息长度字段来指定消息的具体长度;只会分割消息并将单条消息分发到后续流水线,不会打印单条消息内容

      • 解码时会先通过长度字段偏移量和长度字段长度读取当前消息的长度;再从消息中读取对应的一条消息并处理粘包半包现象;

      • 解码后的消息仍然会保留模数字段,长度字段,消息附带内容,这些数据原封不动放在了ByteBuf中消息的前面,如果我们不想要长度字段和模数字段,可以在LengthFieldBasedFrameDecoder构造时指定入参initialBytesToStrip指定解码后的以长度字段打头的消息要从头去掉几个字节,通过这种方式我们只获取到消息内容本身或者对消息进行更进一步处理;

      • LengthFieldBasedFrameDecoder对象可以通过构造方法new LengthFieldBasedFrameDecoder(int maxFrameLength,int lengthFieldOffset,int lengthFieldLength,int lengthAdjustment,int initialBytesToStrip)来创建,

        • 参数maxFrameLength是指定消息的最大长度,超过该长度的消息直接抛TooLongFrameException异常,

        • lengthFieldOffset消息长度字段偏移量,即消息长度字段部分从一条消息的哪一个索引位置开始,因为可能存在模数字段,长度字段并不总是从索引0处开始

        • lengthFieldLength消息长度字段长度,是指定表示消息的长度字段占用的字节数

        • lengthAdjustment以长度字段的最后一位为标志,还有指定个字节后的内容才是消息本身,指定个字节实际上就是消息附带内容第三部分的长度

        • initialBytesToStrip从解码后以长度字段打头的消息要从头去掉指定个字节,这样可以去掉消息中的长度字段和其他信息只保留单条消息本身

    • 用法

      • LengthFieldBasedFrameDecoder以处理器Handler的方式加入到流水线中,使用方法和Netty的固有处理器是一样的,如socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,0,4,0,0))

        • 从测试现象来说,LengthFieldBasedFrameDecoder会拆分或者拼接出单条消息后才会让每一个单条消息去执行后续的流水线操作,不够单条消息就会阻塞流水线的执行

        • 字节数可以通过byteBuf.writeInt(length)等方式写入,然后调用byteBuf.writeBytes(bytes)写入消息内容本身

        • 如果写入数据的格式不对,取数据一旦混乱,后续的读取长度数据就会直接因为读取不到或者读取错误直接抛出异常

      • 写数据的时候需要结合业务需要按照LengthFieldBasedFrameDecoder构造方法中设计的格式向ByteBuf中写入完整格式的消息,写多少条消息不管粘包半包没有要求

      • 注意帧解码器最好不要设置成多个EventLoop共用,即把同一个EventLoop对象设置到多个流水线中,应该每个流水线都创建一个独立的帧解码器实例,因为帧解码器不是线程安全的,如果作为共享资源被多个线程共享,就可能出现多个EventLoop的消息混在一起的情况,特别是像帧解码器这种处理半包现象会阻塞等待后续消息的处理器如果被多个线程共享极容易发生其他线程拼接消息到帧解码器上一条还暂存的半包消息,导致两条消息都完全错乱,像帧解码器这种只要能记录多条消息状态的处理器就不能将同一个处理器实例加到多个流水线中

        • Netty为所有支持多线程共享的处理器添加了@Sharable注解,这些处理器Netty充分考虑了其各种场景下的线程安全性,可以添加到不同的流水线中供多个eventLoop共享

         

协议解析与设计

  1. Redis通信协议简介

    • 命令通信数据格式为*#回车键换行键$#回车键换行键元素1数据内容回车键换行键$#回车键换行键元素2数据内容回车键换行键...

      • #:代指数字,实际上在通信数据中要替换成对应的表示元素个数和每个元素占字节数的对应数字字符

      • *#:指定命令的元素个数[即单词个数]#是命令的单词个数

      • $#:指定当前元素的数据内容字节个数,#是当前单词的字节个数

      • 元素数据内容:当前单词对应的字节

      • 回车键换行键*#*$以及元素数据内容之间全部使用回车键换行键进行分隔[回车键和换行键对应的字节码分别为1310]

      • 格式为[元素个数][元素1字节数][元素1实际内容][元素2字节数][元素2实际内容][元素3字节数][元素3实际内容]...,每个中括号[中括号只是方便理解,实际消息中没有中括号]代表的部分之间使用[回车键换行键]作为分隔符

    • 比如用户发送一条命令set name zhangsan[Redis会将整条命令视为一个数组]Redis要求发送的通信数据格式为*3*3表示命令的组成元素个数;$3set中的$3表示第一个元素的长度为3,元素内容为set,对应的$4name表示第二个元素为四个字节,元素内容为name;每个单元之间需要使用回车符换行符两个字节分隔;set name zhangsan对应的命令通信数据格式为*3回车符换行符$3回车符换行符set回车符换行符$4回车符换行符name回车符换行符$8回车符换行符zhangsan回车符换行符

      • *3回车符换行符$3回车符换行符set回车符换行符$4回车符换行符name回车符换行符$8回车符换行符zhangsan回车符换行符这个数据内容原封不动发给Redis服务器的6379端口就能在redis中成功执行命令set name zhangsanRedis命令执行成功会返回消息+OK回车键换行键对应的字节

    • 协议的意义

      • 只要用户遵照服务端的通信协议向服务端发送通信消息,服务端就能解析用户的意图并执行用户希望的操作并与客户端进行语言通信

      • Netty提供了很多像Redis的通信协议、HTTP协议、HTTPS协议、WebSocket协议等等,我们只要按对应的协议要求配置相应内容Netty就能帮我们生成对应协议格式的通信数据,无需每条消息我们再去手动拼接出实际消息如*3回车符换行符$3回车符换行符set回车符换行符$4回车符换行符name回车符换行符$8回车符换行符zhangsan回车符换行符

 

HTTP协议编解码器


  1. 概念:

    • Netty中提供了一个HTTP协议编解码器HttpServerCodecHttpServerCodec继承自CombinedChannelDuplexHandler[CombinedChannelDuplexHandler组合了其泛型中列举的两个Handler,一个是HttpRequestDecoder对客户端请求的解码处理器,另一个是HttpResponseEncoder响应给客户端的编码处理器],在Netty中只要类的名字以Codec结尾就说明该类同时包含了解码和编码功能

      • HttpServerCodec既是入站处理器又是出站处理器,作为入站处理器时HttpRequestDecoder生效,作为出站处理器时HttpResponseEncoder生效

    • HttpServerCodec中的入站处理器HttpRequestDecoder会对请求通信数据进行解码,会将通信数据解码成HttpRequest即请求头,请求行[实际上HttpRequest是接口,对应的子实现类是DefaultHttpRequest]以及HttpContent请求体[对应的子实现类是LastHttpContent$1,即使是GET请求没有请求体也会解码出请求体消息,只是其中没有内容]两条消息

      • 因此我们在处理器HttpServerCodec的后续流水线处理时,需要使用msg instanceof HttpRequestmsg instanceof HttpContent来判断消息是请求头还是请求体[使用接口是为了避免实现发生变化]来分别进行处理

      • 对于GET请求我们只关心请求头信息无需关注请求体消息,用户可以通过自定义一个SimpleChannelInboundHandler<T>在泛型中添加消息的特定类型,可以实现处理器只对特定类型的消息起作用,比如socketChannel.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>())即处理器SimpleChannelInboundHandler只会对HttpRequest类型的消息生效,如果是HttpContent类型的消息该处理器就会跳过不执行

        • SimpleChannelInboundHandler<T>是入站处理器,通过重写ChannelRead0(ChannelHandlerContext ctx,T msg)来指定对特定消息的入站处理操作

      • HttpRequest

        • httpRequest.uri():获取请求行信息

        • httpRequest.headers():获取请求头信息

        • httpRequest.protocolVersion():获取请求的HTTP协议版本

    • 响应数据需要使用DefaultFullHttpResponse对象进行封装,HttpServerCodec中的出站处理器HttpResponseEncoder会将DefaultFullHttpResponse对象编码成符合HTTP协议格式的ByteBuf将字节数据返回给浏览器

      • DefaultFullHttpResponse:该对象可以通过new DefaultFullHttpResponse(HttpVersion version,HttpResponseStatus status,ByteBuf content,boolean validateHeaders)来实例化,

        • versionHTTP协议的版本,一般和请求的HTTP协议版本保持一致

        • status是响应状态码,HttpResponseStatus.OK就是响应状态码200

        • ByteBuf byteBuf=defaultFullHttpResponse.content():该方法可以获取到响应体对应的ByteBuf对象即构造方法中的content对象,通过向该ByteBuf写入响应体数据就能将响应体写入响应

        • 通过调用channelHandlerContext.writeAndFlush(defaultFullHttpResponse)可以将响应写回给客户端[实际上此时就已经实现了一个服务器,浏览器此时可以直接给服务器正常发起请求,服务器响应的响应体也可以直接被浏览器识别展示,非常地牛逼,相当于实现了一个极简版的Tomcat]

          • 但是因为没有在响应头中指定响应HTTP协议格式的消息的长度content-length,浏览器不知道消息何时接收完毕认为消息没有接收完会一直转圈等待接收更多的响应数据;可以通过defaultFullHttpResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH,响应体的字节长度)来向响应头中设置响应体的字节数

          • 浏览器除了发送用户发起的请求还会在用户请求响应后额外发起一个/favicon.ico【GET】请求来获取站点的图标

    • 不止是HTTP协议,几乎常见的协议Netty都提供了对应的处理器,我们可以使用对应不同协议的处理器快速开发各种网络业务系统或者服务器

 

自定义协议



  1. 自定义协议涉及的概念:

    • 魔数:在数据接收前首先根据魔数判断消息是否是有效消息,一般发送消息的头几个字节总是魔数

      • Java的二进制字节码起始几个字节始终是cafebaby

    • 版本号:有版本号就能支持协议升级,根据版本号可以判定消息的具体格式

    • 序列化算法:就是对象转换为传输消息的具体方法

    • 消息正文:就是传输的消息本身

      • 实际上传输的消息一般都是很复杂的,正文一般都组织称特定格式的字符串,比如Jsonxml或者Java中的对象流

      • 这个过程就涉及到把服务器内的对象转换成特定组织格式的消息,接收到消息后将消息解码为对应的对象,这个过程就叫队列序列化,对象序列化是将对象以各个平台通用的一种格式来进行传输,常见的序列化算法有jsonJdk的序列化算法[JDK的序列化算法不能跨语言平台、性能也不好]、谷歌的protobufhessian[protobufhession都是基于二进制的,可读性不好,但是字节数更少,性能更高]

      • 序列化方式一般通过一个标识符在消息中指定

    • 指令类型:指消息类型,和业务相关,标识消息是哪一项具体业务的消息

    • 请求序号:就是消息的唯一标识,为了给双工通信提供异步能力,如果没有唯一标识,消息就只能收发一条完成后才能发送下一条,有了请求序号可以一次性发送一批消息,根据响应消息的序号可以识别具体消息的对应响应

    • 正文长度:通过正文长度可以界定一条消息的边界,方便处理TCP网络通信中的粘包半包问题

  2. Netty中的自定义协议[这是测试用例,不要在生产中使用]

    • 自定义消息的编解码器:通过该编解码器可以实现消息和ByteBuf的互转

      • 通过继承NettyByteToMessageCodec<T>重写其中的encode方法实现消息正文对象向ByteBuf的转换,重写其中的decode方法实现ByteBuf对象向消息正文的转换,说白了就是在编码方法中定义将消息正文按照自定义协议转换成传输数据字节的方法,在解码方法中对编码方法逆向定义将传输字节数据按自定义协议转换成消息对象的方法

      [消息抽象父类]

      [消息子实现类]

      [编解码器]

      • 这个代码没有考虑粘包半包情况,粘包好解决,因为先读取到消息正文的长度可以在程序中简单处理;半包比较复杂,假如客户端数据分多次发送,如果服务端不能完整获取到消息正文,消息正文字节数组反序列化为Java对象的时候就会抛出异常

      • 在编解码器前面添加基于长度字段的LengthFieldBasedFrameDecoder帧解码器可以解决自定义协议的粘包半包问题

      [向ByteBuf写入的传输数据内容]

      • 除开前16位是我们自己指定后,后续字节是使用JDK序列化Message对象得到的字节,其中很多内容ByteBuf本身是无法识别的,需要使用JDK的序列化器将其反序列化为Message对象

      [从ByteBuf中解析出的消息对象]

 

解决粘包半包

  1. 使用帧解码器解决自定义协议中的粘包半包问题

    • 这里帧解码器的配置是new LengthFiledBasedFrameDecoder(1024,12,4,0,0),分别为消息的最大长度为1024,长度字段偏移量为12,长度字段本身的字节数为4个字节,长度字段后有0个字节非消息正文内容,我们使用自定义的MessageCodec处理完整的网络传输消息,因为从头开始截取的字节长度也为0,一旦截取了后续使用MessageCodec解析消息就会出错抛异常

    • 这个帧解码器接收到粘包消息后会自动分割每条消息并让每条消息都去执行后续的流水线,接收到半包消息会直接阻塞直到接收到后续完整的消息才会将消息放行后续的流水线

    • 编解码器一般会涉及到记录多次消息的状态,因此编解码器一般不能线程间共享,也不会添加@ChannelHandler.@Sharable注解;我们自己自定义的适配业务通信协议的编解码器可以事先经过帧解码器处理得到单条消息,可以避免记录消息的状态,从逻辑上可以加@Sharable注解,但是自定义的编解码器需要继承抽象父类ByteToMessageCodec<T>,父类的文档中明确说明ByteToMessageCodec<T>的所有子实现类上不能标注@Sharable注解,标注了启动就会报错

      • 因为ByteToMessageCodec<T>的子类都是用户自定义的编解码器,Netty在设计的时候就认为用户自定义的编解码器需要处理粘包半包消息会涉及到保存状态,因此ByteToMessageCodec<T>在构造方法中调用ensureNotSharable()方法来确保子类在构造时不能添加@Sharable注解,在isSharable()方法中调用class.isAnnotationPresent(Sharable.class)方法来判断当前类及其子类上是否添加了@Sharable注解,如果添加了@Sharable注解子类调用构造方法实例化时就会报错

      • 如果必须要在自定义编解码器上添加@Sharable注解,可以让自定义编解码器继承MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>Netty定义MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>是为了让用户明确这是消息到消息的转换,使用自定义编解码器处理消息前就已经解决了消息的粘包半包问题不会涉及到编解码器记录消息的状态,其中INBOUND_IN是传入当前处理器的消息类型,OUTBOUND_IN是处理后要传递给后续流水线的消息,该抽象父类的子实现类对应的也需要实现encode方法和decode方法,通过继承MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>就能实现自定义编解码器上添加@Sharable注解

      • 实际上这只是一种技法,实际上能不能加@Sharable注解是由编解码器的编解码方法需不需要通过记录消息状态来解决如消息粘包半包问题来决定的

     

序列化算法扩展

  1. 常见序列化算法

    • JDK序列化算法

      • 序列化时使用ObjectOutputStream(new ByteArrayOutputStream()).writeObject(object),通过其中的byte[] bytes = byteArrayOutputStream.toByteArray()获取对象序列化后的字节数据

      • 反序列化时使用T ---> (T) ObjectInputStream(new ByteArrayInputStream(byte[])).readObject()获取通过字节数据反序列化后的数据对象

        • 注意JDK序列化器会默认将对象转换成Object类型,此时可以使用对象的父接口来向下强转,但是使用第三方提供的序列化器不能直接指定父接口或者父抽象类,因为第三方序列化器是直接创建对应类型的对象,必须指定目标对象的实际类型

    • JSON序列化算法

      • 谷歌的Gson序列化器

        • gsonClass类型对象转换成json格式字符串比如gson.toJson(String.class)会抛UnsupportedOperationException,提示用户忘记注册类型转换适配器,用户对特定类型需要指定类型转换适配器gson才能将特定类型转换成json字符串

          • 有了自定义类型转换适配器通过new GsonBuilder().registerTypeAdapter(Class.class,new CustomGsonAdapter()).create();方法创建gson对象,再通过Gson对象的gson.toJson(String.class)去将class对象转换成json字符串就不会报错了

          [gson自定义类型转换适配器]

           

         

  2. 扩展序列化算法设计

    • 定义一个接口,支持配置多种序列化算法对数据进行序列化[将java对象转换成字节数组]和反序列化[根据对象类型和字节数组将字节数组还原为Java对象,JDK的序列化数据中已经保存了目标类型信息,像JSON这种反序列化方式必须明确指定字节数据要反序列化的目标类型对象,否则相关的API就无法工作]

      • 要使用其中一种实现可以直接通过byte[] ---> Serializer.Algorithm.Java.serialize(message)将消息正文序列化为byte数组,通过Message message = Serializer.Algorithm.JDK.deserialize(Message.class,bytes)byte数组反序列化为Java对象

      • 枚举值有一个ordinal()方法,第一个枚举值的ordinal()方法返回0,第二个枚举值的ordinal()方法返回1,依次类推

      • 枚举类还有一个values()方法,可以获取一个枚举类的所有枚举值,并以数组的形式返回,我们可以从消息中获取枚举值对应的序号,通过Serializer.Algorithm.values()[枚举值对应序号]来获取对应序列化方式的枚举值,通过Serializer.Algorithm.values()[枚举值对应序号].serialize(messageActualClass,bytes)来反序列化字节数据到Java对象

    • 编写配置类让具体使用哪种序列化方式变成用户配置文件可配置

      • 用户可以根据自己在配置文件对序列化方式的配置通过SerializerConfig.getSerializerAlgorithm()拿到用户配置的枚举值,通过SerializerConfig.getSerializerAlgorithm().ordinal()获取枚举值对应的单字节序号,我们可以通过该序号在消息中设置用户选取的序列化方式,通过SerializerConfig.getSerializerAlgorithm().serialize(message)对消息正文进行序列化或者通过SerializerConfig.getSerializerAlgorithm().deserialize(Message.class,bytes)对消息进行反序列化

       

 

 

聊天业务设计

  1. 服务端接口[不包含离线用户的管理]

    • 用户管理接口

      • 用户登录功能:根据用户名和密码以及短信验证码判断用户身份,验证成功登录用户并将用户保存到session

    • 单个用户会话管理接口[管理用户名和相应的Channel信息]

      • 根据用户唯一标识和客户端通道绑定用户和客户端通道

      • 根据连接通道解绑用户和客户端通道的联系

      • 为一个绑定关系设置额外的任意类型属性

      • 获取一个绑定关系的额外任意类型属性

      • 根据用户唯一标识获取客户端连接通道

      • 根据客户端通道获取用户信息

    • 聊天组会话管理接口

      • 创建一个聊天组:聊天组的名字需要唯一,创建聊天组时需要设置聊天组的初始成员

      • 将用户添加到聊天组,聊天组不存在加入聊天组失败

      • 从聊天组中移除单个用户,聊天组不存在加入聊天组失败

      • 移除聊天组

      • 根据聊天组的名称获取聊天组的所有成员

      • 根据聊天组的名称获取所有在线成员的客户端通道

  2. 项目包结构

    • 客户端代码的client

    • 消息抽象父类和所有子实现的message

    • 编解码器、协议相关的类、Netty提供的类对消息预处理[统一在结构层面对Netty的工具类做约束方便管理]protocol

    • 服务端代码的server

  3. 登录业务逻辑

    • 验证用户名和密码,登录成功保存用户登录状态,登录失败跳转失败页,以下的逻辑很简陋,只是作为一种业务设计上的参考

      • 客户端:给客户端流水线添加一个自定义处理器

        • 重写channelInboundHandlerAdapter.channelActive(channelHandlerContext)方法定义在客户端与服务端连接建立后客户端要执行的操作,发起异步任务等待用户输入用户名和密码,线程间的协作可以通过countDownLatch来实现,两个线程之间可以通过共享变量分享线程的业务状态信息

        • 重写channelInboundHandlerAdapter.channelRead(channelHandlerContext,msg)接收客户端的响应消息,如果客户端登录成功,执行登录的线程可以进入死循环等待用户输入的消息

      • 服务端:

        • 给服务端流水线添加一个自定义处理器,重写simpleChannelInboundHandler<LoginRequestMessage>.channelRead0(channelHandlerContext,loginRequestMessage)处理登录业务逻辑,登录成功或者失败都向客户端响应LoginResponseMessage告知登录结果,登录成功将用户和通道的对应关系保存到自定义缓存中

  4. 客户端向服务端消息发送逻辑

    • 客户端封装一个单人聊天消息类ChatRequestMessage,构造的时候传参消息发送者、消息接收者和消息正文,发送消息时直接调用channelHandlerContext.writeAndFlush(chatRequestMessage)发送消息[注意writeAndFlush方法的入参类型是Object,这个消息在发送前会自动经过出站处理器转换成ByteBuf]

    • 封装群聊消息类GroupChatRequestMessage,封装消息发送用户、群聊组的唯一标识和消息正文

    • 封装创建聊天组消息类GroupCreateRequestMessage,封装聊天组的名字和初始用户列表

    • 封装获取聊天组成员消息类GroupMembersRequestMessage,封装聊天组的名称

    • 封装加入聊天组消息类GroupJoinRequestMessage,封装用户名和聊天组名

    • 封装退出聊天组消息类GroupQuitRequestMessage,封装用户名和聊天组名

    • 客户端退出时,关闭与服务端的连接通道,服务端检测到客户端连接断开,会自动触发通道的inactive事件断开连接并做一些后续处理

  5. 服务端单聊消息业务处理逻辑

    • 封装一个单聊消息对应的处理器,对协议的编解码抽取过一个单独的编解码器,直接放在当前处理器前面作为入站处理器即可

      • 根据用户名查询目标用户的通道,如果用户没有上线将消息固化到数据库;如果查询到目标用户的通道,直接通过该通道将数据写出到客户端

      • 注意弹幕提及被抽取出来的处理器一定要加@Sharable注解,只要有一个不加,就会导致客户端只能链接一个,在启动第二或者更多客户端会显示链接失败。推测应该是netty为了保护数据安全的机制

    • 依次在流水线上加入帧处理器的用户自定义子类实现ProcotolFrameDecoder处理可能存在的粘包半包消息,记录日志的处理器LoggingHandler,将ByteBuf按照我们自定义的协议转换成Message对象的自定义处理器MessageCodec,处理用户登录状态的自定义LoginRequestMessageHandler,处理单聊消息的自定义ChatRequestMessageHandler

  6. 服务端创建群聊消息业务处理

    • 封装一个根据创建群聊消息来创建群聊的自定义处理器,父类为SimpleChannelInboundHandler<GroupCreateRequestMessage>,重写channelRead0(channelHandlerContext,groupCreateRequestMessage)方法,根据群聊创建信息创建群聊并设置群聊初始信息,创建成功向客户端发送群聊成功创建消息,向所有初始群聊用户发送拉群成功消息;创建失败向客户端发送群聊创建失败消息

    • 在上述流水线后面添加该处理器

  7. 服务端群聊消息业务处理

    • 封装一个群聊消息对应的处理器,父类为SimpleChannelInboundHandler<GroupCreateRequestMessage>,重写channelRead0(channelHandlerContext,groupCreateRequestMessage)方法,根据群聊名称获取到群聊中的所有用户,给所有群聊用户发送消息

    • 在上述流水线后面添加该处理器

  8. 服务端退出客户端业务处理

    • 客户端退出分为正常退出和异常退出,正常退出会触发服务端对应通道的inactive事件,异常退出服务端多路复用的选择器对应的线程就会抛出异常,处理器相应封装了exceptionCaught(channelHandlerContext,Throwable cause)在选择器抛出异常时自动调用,注意此时服务端还是会自动打印异常的堆栈信息

    • 封装一个退出客户端的业务处理器,父类为ChannelInboundHandlerAdapter,重写channelInactive(channelHandlerContext)方法,客户端正常退出情况下从会话管理器中移除用户和对应的连接通道;重写exceptionCaught(channelHandlerContext,Throwable cause)方法,客户端异常退出情况下从会话管理器中移除用户和对应的连接通道

 

 

连接通道假死

  1. 服务端空闲检测

    • 网络编程中很容易出现连接假死的情况,比如网络设备出现故障,网卡故障、网线断掉、机房停电;底层的TCP连接已经断开,但是应用程序不会知道该情况,仍然在服务端保持着对应的资源,占用服务端的资源,服务端的最大连接数一般都是被限制了的,老的无用连接不被释放,会降低系统处理客户端通信消息的性能

    • Netty提供了检测连接假死的空闲状态检测器的处理器IdleStateHandler,该处理器判断连接可能存在问题的原理是判断读或者写之后的空闲时间太长

      • IdleStateHandler的构造方法new IdleStateHandler(int readerIdleTimeSeconds,int writerIdleTimeSeconds,int allIdleTimeSeconds),入参readerIdleTimeSeconds是设置检查读取数据的空闲时间超过指定时间,writerIdleTimeSeconds是设置检查写入数据的空闲时间超过指定时间,allIdleTimeSeconds是设置读写的空闲时间都超过指定时间

      • 如果超过readerIdleTimeSeconds时间还没有触发读操作会触发一个IdleState#READER_IDLE事件,这个事件会自动经过当前流水线,因为读取空闲超时和写出空闲超时触发的事件分别执行的是入站处理器和出站处理器,因此处理空闲超时事件的处理器应该是双向处理器

        • Netty中自定义双向处理器可以使用ChannelDuplexHandler的匿名实现,既可以作为入站处理器,也可以作为出站处理器

          • channelDuplexHandler.userEventTriggered(channelHandlerContext ctx,object evt)只有用户自定义的事件或者IdleState下的各种事件才会触发,其中入参Object evt就是事件本身,注意IdleState#READER_IDLE事件的类型是IdleStateEvent

    • 业务中如果客户端的空闲时间达到了服务端设置的最大空闲时间,服务端一般不会直接调用channel.close()关闭客户端连接,这样客户端可能需要时间很长的操作在中间断掉了,这样会导致服务端本意处理连接假死却导致很多正常使用的客户端被误伤

      • 避免假死误判的方法可以让客户端定时向服务端发送数据,这中数据被称为心跳数据包,系统设计时可以设置一个最大写空闲时间,设置一个双向处理器重写channelDuplexHandler.userEventTriggered(channelHandlerContext ctx,object evt)处理该IdleState#WRITER_IDLE事件向服务端写出心跳数据[注意客户端最大写空闲时间一般是服务端的最大读空闲时间的一半,流量就是这么没的],服务端读取数据一旦超过最大读空闲时间,就说明客户端连接肯定假死,此时服务端可以直接结束假死的客户端连接

 

TCP三次握手

  1. TCP三次握手流程

    • 组件:

      • 客户端:

      • 服务端:

      • 半连接队列sync queue:服务端将还没有完成三次握手的连接信息存入半连接队列

      • 全连接队列accept queue:服务端将已经完成三次握手的连接信息会存入全连接队列,全连接队列的容量大小决定了服务端能存放多少个已建立连接的客户端,一旦客户端连接数量超过队列容量,服务端会发送一个拒绝连接的错误信息给客户端,客户端会抛出ConnectException连接异常

    • 流程

      • 服务端调用bind()方法绑定通信端口

      • 服务端调用listen()方法监听连接请求

      • 客户端调用connect()方法发起一个连接请求数据包SYN,客户端的状态变成SYN_SEND状态,

        • 这是第一次握手

      • 服务端接收到SYN数据包会将数据包封装成一个连接信息对象存入半连接队列,服务端的状态变成SYN_RCVD状态

      • 服务端向客户端响应SYN数据包和ACK应答数据包,

        • 这是第二次握手

      • 客户端接收到SYN+ACK数据包表明服务端收发消息的能力正常,客户端状态变成ESTABLISHED状态

      • 客户端向服务端响应ACK数据包表明客户端接收服务端消息的能力正常,服务端状态变成ESTABLISHED状态

        • 这是第三次握手

      • 服务端将连接信息转移到全连接队列

      • 服务端调用accept()方法从全连接队列中获取连接对象进行数据操作,连接信息被accept处理以后就会从全连接队列中移除,只有accept()方法处理能力有限时连接信息才会在全连接队列中进行堆积

  2. backlog参数

    • linux2.2前,backlog一个参数就控制了半连接队列和全连接队列的大小,这个linux版本很旧,现在不会使用这么旧的版本

    • linux2.2以后,linux提供了两个系统配置文件来分别配置两个连接队列的容量大小

      • 半连接队列的大小通过配置文件/proc/sys/net/ipv4/tcp_max_syn_backlog指定

      • 全连接队列的大小通过配置文件/proc/sys/net/core/somaxconn指定,程序中NIO可以通过bind(port)的重载方法bind(port,backlog)配置backlog参数;Netty中的基本组件ServerBootstrapBootstrap没有该bind(port,backlog)方法,需要通过serverBootstrap.option(ChannelOption.SO_BACKLOG,1024)来进行配置,生产环境中这个参数应该配置的大一些,至少都要1024

        • 该配置文件/proc/sys/net/core/somaxconn中只有一个数字就是backlog参数值

        • 如果同时通过linux系统的配置文件和程序的bind(port,backlog)参数指定了backlog参数,会自动选取两个backlog参数中较小的参数值,Netty中的程序指定backlog参数默认值可以通过类DefaultServerSocketChannelConfig中的backlog属性值查看,取的默认值为NetUtil.SOMAXCONN,赋值语句为somaxconn=PlatformDependent.isWindows()?200:128;,该语句的含义是如果操作系统是windows全连接队列的容量就为200,不是windowsLinux或者Mac就是128;然后会去读取配置文件/proc/sys/net/core/somaxconn,如果该文件存在就会以该配置文件的数字作为全连接队列的容量backlog

        • 实际上Nettyaccept方法处理能力是很强的,将全连接队列设置的大一些可以避免高峰期队列被快速堆满导致客户端连接被频繁拒绝

  3. ulimit -n 数值参数

    • 该参数限制一个进程最大能打开的文件描述符的数量,linux中不论文件描述符还是socket都是用文件描述符FD来表示,当进程打开文件超过该参数设置的上限,进程再想打开文件会报错TooManyOpenFile,该设置是为了避免文件或者socket打开太多伤害系统,服务端如果要做高并发支持大量的客户端连接一定要调整该参数

    • 该命令在linux中是一个临时生效命令,一般搭配进程启动命令一起组成启动脚本

 

搭建RPC框架

  1. 搭建一个RPC框架的准备工作

    • 创建消息类型:RPC请求消息类型RPCRequestMessage、RPC响应消息类型RPCResponseMessage

    • RPCRequestMessage请求消息设计

      • 远程调用的目标接口名称[接口名称是实际java中的接口,不是Web开发中的接口]

      • 远程调用的目标方法名

      • 目标方法的返回值类型

      • 目标方法的入参类型数组

      • 目标方法的入参实际值数组

    • RPCResponseMessage响应消息设计

      • 目标方法实际的返回值

      • 目标方法可能发生的异常

    • 服务端流水线

      • 处理粘包半包的帧解码器ProcotolFrameDecoder

      • 日志处理器LoggingHandler

      • 自定义协议的编解码器

      • 处理RPC请求消息的处理器RPCRequestMessageHandler

    • 客户端流水线

      • 处理粘包半包的帧解码器ProcotolFrameDecoder

      • 日志处理器LoggingHandler

      • 自定义协议的编解码器

      • 处理RPC响应消息的处理器RPCResponseMessageHandler

  2. 实现RPCRequestMessageHandler

    • 根据接口名称找到接口实现类对象[通过对象获取到类对象反射调用指定方法]

      • Spring容器中直接通过名字获取容器实例即可

      • 原生的Java可以通过Class.forName("全限定类名")获取到对应类的class对象,通过服务端保存的class对象和对象实例对应关系获取对应的对象实例service

    • 通过对象实例的类对象service.getClass()getMethod(methodName,parameterType[])获取到方法对象method,通过方法对象的Object res=method.invoke(service,parameterValve[])指定执行方法的对象和参数值来调用对应方法,返回值res就是方法执行的结果

    • 将执行结果封装到RPCResponseMessage[如果出现异常将异常对象封装到RPCResponseMessage中],将消息序列号封装到RPCResponseMessage,通过channelHandlerContext.writeAndFlush(rPCResponseMessageHandler)将消息经过流水线处理写出到客户端

  3. 实现RPCResponseMessageHandler

    • 直接打印响应结果

  4. 创建RPCClientManagerRPC客户端管理器对象

    • 为用户提供channel对象发送消息

      • 等客户端启动器创建建立连接后返回channel对象,将channel对象赋值给RPCClientManagerchannel属性方便用户发起远程调用请求时随时取用

      • 编写回调方法channel.closeFuture().addListener(ChannelFutureListener),用户调用channel.close()channel连接通道被关闭以后会异步自动调用channelFutureListener.operationComplete(ChannelFuture future)方法来处理通道关闭后的逻辑,比如调用nioEventLoopGroup.shutdownGracefully()优雅关闭事件循环组并结束客户端的执行

      • 一个客户端只提供一个Channel,因此创建客户端启动器建立连接返回channel对象的代码只能执行一次,在获取channel对象时先检查channel对象是否不为null,不为null直接返回channel,如果为null再去重建channel,使用双重检查锁避免多线程并发重建channel导致channel被执行多次

    • 根据用户调用的方法封装RPCRequestMessage消息

    • RPCClientManager中为用户提供接口动态代理对象的getProxyService()方法将用户调用远程方法的行为转换成向远程服务发送远程调用消息的行为

      • 这里演示使用的是JDK动态代理

      • 远程方法调用完响应消息被客户端接收后会被流水线处理成自定义的RPCResponseMessage,流水线的处理得到RPCResponseMessage是在线程eventLoop中完成的,动态代理对象一般在用户线程中完成,这里涉及到两个线程间共享同一个数据的问题,使用Promise容器可以实现多个线程间交换同一个数据

        • 准备一个ConcurrentHashMap以消息序号作为key,以Promise作为值缓存远程调用响应消息经流水线处理后的结果,为了保证多线程并发共享数据的线程安全性使用了concurrentHashMap<Integer,Promise<?>>[?是通配符,表示适配任意类型,这是因为不知道响应的结果是什么类型,注意这里用通配符不行,后续向Promise对象中设置值会出现问题,将通配符改成Object类型,泛型通配符有一个只能从泛型容器中获取值,不能向泛型容器即泛型类型变量设置值的特性,编译会报错,但是可以设置null值]Promise对象由代理对象通过DefaultPromise<?> promise = new DefaultPromise<>(channel.eventLoop())创建后存入ConcurrentHashMap[入参channel.eventLoop()是创建Promise对象需要指定将结果传入Promise对象的线程EventExecutor对象,需要流水线的执行线程即channel.eventLoop()对象,注意该concurrentHashMap老师设置为RPCResponseMessageHandler的一个公有静态变量],代理对象创建Promise对象并将其存入concurrentHashMap后调用promise.await()或者promise.sync()等待eventLoop接收到响应将结果存入promise对象,使用promise.await()使用promise.isSuccess()来判断是否正常成功获取消息,成功获取响应结果直接获取结果设置为代理对象对应方法的返回结果,如果没有成功获取响应结果,包装异常对象promise.cause()直接通过代理对象抛出

    • RPCResponseMessageHandler中增加将流水线处理结果存入concurrentHashMap中的Promise对象的逻辑

      • 通过消息的序列号从concurrentHashMap中获取消息对应的Promise对象,检查远程调用的响应结果是否正常,如果正常调用promise.setSuccess(returnValue)设置远程调用执行结果,如果远程调用有异常就调用promise.setFailure(exceptionValue)将异常信息设置到Promise[注意Gson对Throwable对象向json字符串的转换不需要自定义转换适配器],为了避免序列号错误或者其他错误需要对promise对象判空,只有非空才能进行设置值操作,否则会出现空指针异常,因为各种原因导致集合中没有对应promise对象是可能出现这种问题的

  5. 典型问题

  1. 要点

    • 远程调用时的代理设计,线程间交换数据,协议设计,黏包半包处理

     

 

源码剖析

启动流程

  1. Netty的启动流程

    • NIO的启动流程

      • Selector selector =Selector.open()创建选择器selector

      • ServerSocketChannel ssc = ServerSocketChannel.open()创建服务端Socket连接通道

      • SelectionKey selectionKey = ssc.register(selector,0,attachment)向选择器注册通道和相应附件,入参0表示通道还没有关注任何事件

      • serverSocketChannel.bind(new InetSocketAddress(8080))为通道绑定服务端通信端口

      • selectionKey.interestOps(SelectionKey,OP_ACCEPT)通过通道对应事件监听对象设置通道的事件监听类型

    • Netty的启动流程[NettyNIO监听连接事件的四行代码处理封装了四百行,主要执行的操作如下]

      • Netty中使用封装了线程池和选择器Selector selector =Selector.open()NioEventLoopGroup来管理客户端请求事件

      • NioServerSocketChannel attachment = new NioServerSocketChannel()创建Netty中的NioServerSocketChannel,同时初始化通道关联的处理器流水线以及NioServerSocketChannel对应的Config配置对象

      • ServerSocketChannel serversocketChannel = ServerSocketChannel.open()创建JDKNIO原生的serverSocketChannel,并通过serverSocketChannel.configureBlocking(false)将通道设置为非阻塞模式

      • SelectionKey selectionKey = ssc.register(selector,0,attachment)NioServerSocketChannel作为附件并将原生通道serversocketChannel注册到选择器中,0表示尚未关注任何事件

      • serverSocketChannel.bind(new InetSocketAddress(8080))为通道绑定服务端通信端口

      • selectionKey.interestOps(SelectionKey,OP_ACCEPT)通过通道对应事件监听对象设置通道的事件监听类型

  2. 源码解析

    • 上述的操作除了Selector selector =Selector.open()是在初始化NioEvemtLoopGroup实例时执行,其他的流程都在serverBootstrap().bind(8080)中完成的

    • init部分创建NioServerSocketChannel以及初始化NioServerSocketChannel中流水线的所有处理器两部分都是在主线程完成

    • registry中启动Nio事件处理线程在主线程中完成,将原生serverSocketChannel注册到选择器中注册关注的事件以及执行流水线中的处理器都在Nio事件处理线程中完成,initAndRegister方法返回一个ChannelFuture对象

    • doBind0方法会将serverSocketChannel绑定到指定端口上、并指定通道上被监听事件的类型,大部分情况下两个操作都由Nio线程异步完成

      • doBind0方法是通过向initAndRegister方法返回一个ChannelFuture对象注册写入值回调事件的方式由Nio线程来执行的,但是如果initAndRegister方法执行的足够快在else if语句块中由主线程执行的,因此大部分情况注册过程比较慢由Nio线程执行doBind0方法[这里怀疑Nio注册慢仍然使用Nio线程执行端口绑定和监听事件类型需要等通道成功注册到选择器中,如果注册比较快直接由主线程绑定端口并指定监听事件类型,如果注册比较慢为了保证主线程的执行效率通过NettyFutureaddListener回调在Nio线程执行完注册通道后由Nio线程立即执行绑定端口以及指定事件监听类型的代码,避免主线程发生同步阻塞等待]

     

EventLoopGroup



  1. NioEventLoop的组成

    • selector[NioEventLoop有两个成员变量Selector selector以及Selector unwrappedSelector]、线程[线程对应其父类的抽象父类SingleThreadEventExecutor的成员变量Thread thread,注意该抽象父类同时还有一个Executor单线程线程池类型的成员变量,线程池中的线程和成员变量thread就是同一个线程]、任务队列[同样是抽象父类SingleThreadEventExecutor中的成员变量Queue<Runnable> taskQueue,该抽象父类还有抽象父类AbstractScheduledEventExecutor,其中还有一个成员变量PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue即处理定时任务的任务队列,该队列是一个优先级队列]

      • NioEventLoop中有两个Selector是为了提高SelectorselectedKeys的遍历性能,原生NIO中的SelectorSelectedKeys是基于Set集合实现的,遍历性能没有数组好;Netty通过暴力反射将原生SelectorSelectedKeys换成了Netty自定义的基于数组的实现,提升遍历性能,同时因为部分功能必须使用基于Set的实现,因此也保留了原生的Selector

    • 以上的特征决定了NioEventLoop既可以处理IO事件,也能处理定时任务和普通任务

  2. Selector的创建时机

  3. NioEventLoopNIO线程何时被启动

    • EventLoop执行普通任务流程[SingleThreadEventExecuteexecute方法]

      • 如果任务对象为null,直接抛出空指针异常

      • inEventLoop()方法判断当前线程是否等于EventLoop中的成员变量thread,即判断当前线程是否为NIO线程

      • addTask(task)将任务加到任务队列中

      • 如果当前线程不是NIO线程,调用startThread()方法首次启动线程

        • 线程在第一次调用execute方法提交任务时执行,线程池中的NIO线程被赋值给thread变量,该任务是一个死循环任务,会不停去检查有没有IO事件,有没有普通任务,有没有定时任务获取并执行,即NIO线程在首次调用eventLoop.execute()方法时启动且只会启动一次,此后一直死循环检查有无三类任务获取并执行

      • 提交普通任务会调用selector.wakeup()唤醒或者预防NIO线程因为selector.select()导致处于阻塞状态

    • NIO线程死循环进入SelectStrategy.SELECT分支进入阻塞状态的条件

      • 当选择器上的事件个数为0时会让NIO线程进入阻塞状态

      • 选择器上的事件个数不为0会执行下面run方法中switch语句以后的代码处理选择器上的所有事件,即提交任务会唤醒阻塞的NIO线程拿到选择器上所有的IO事件一次性将所有任务全部处理掉并再次进入阻塞状态

      • 阻塞时间默认是一秒零五毫秒,当有普通或者定时任务、超时时间到了、选择器上有事件三种情况下会回到run方法继续执行业务,注意这里有任务或者有事件的判断是被其他线程通过selector.wakeup()唤醒以后做出的判断退出阻塞等待回到run方法继续处理业务,正常情况会一直阻塞到超过指定超时时间,如果被唤醒没有达到超时时间且不满足唤醒条件会继续重新计算剩余阻塞时间并继续阻塞等待,被唤醒以后重新进入循环判断是哪种情况并决定是否回到run方法继续执行业务

  4. nioEventLoop.wakeup()源码解析

    • 只有非NIO线程提交任务才会去尝试唤醒阻塞NIO线程

    • 只有成功CAS更改原子布尔变量wakeUp拿到锁的非NIO线程才能尝试去唤醒因为slector.select(timeoutMills)而陷入阻塞的NIO线程

  5. Netty修复了NIO的空轮询Bug

    • NIOselector.select()或者selector.select(timeoutMills)时,一旦发生了空轮询Bug[这个Bug发生的概率极小],这两个方法即使没有事件发生也无法阻塞当前线程,就会导致NIO线程在死循环上一直空转,如果好几个线程都同时无法阻塞在select方法上,CPU资源就会被耗尽,JDK只有在Linux平台下selector才会出现这个空轮询Bug

    • NettynioEventLoop.select(boolean oldWakeUp)方法中使用了一个局部循环计数变量selectCnt=0,在select方法中的死循环中只要循环一次就会让该计数自增1,如果selector.select()在发生空轮询Bug时失效,死循环就会瞬间循环很多次,Netty通过该循环计数变量超过指定值来判断发生了空轮询Bug,指定值SELECTOR_AUTO_REBUILD_THRESHOLD是循环次数阈值,这个值通过系统环境变量io.netty.selectorAutoRebuildThreshold来自定义设置,如果用户没有指定就使用默认值512

    • 如果Netty认为发生了空轮询Bug,认为底层JDK的实现就有问题,Netty无法修复,通过selectRebuildSelector(selectCnt);方法重新创建一个selector并替换旧的selector,里面其实很复杂,因为还涉及到旧的selector上的状态信息拷贝到新的selector上,JDK只有在Linux平台下selector才会出现这个空轮询Bug,此外Netty还设计了一个全新的Selector把旧的Selector重新实现了一遍

  6. nioEventLoop.run()中的参数ioRatio

    • else语句块中的代码会做两件事,第一件事是processSelectedKeys()去处理选择器上所有的IO事件,第二件事是runAllTasks(ioTime * (100 - ioRatio) / ioRatio);处理所有的普通任务;这里就会出现如果普通任务的处理时间太长会影响到IO事件的处理,Netty为了避免普通任务因为执行时间太长会影响到IO事件的处理,设置了一个ioRatio参量控制Nio线程处理IO事件占用时间的比例,默认参数值为50%的时间用于处理IO事件,50%的时间用户处理普通任务

    • 如果把IO事件的处理比例设置为100则会走if语句块,此时处理完所有的IO事件后会直接将所有的普通任务或者定时任务处理完才会继续处理IO事件,而不是只处理IO事件;一般情况下不要将参数ioRatio设置为100

    • 处理普通任务的耗时即(ioTime/ioRatio)*(100-ioRatio)[ioTime表示执行IO事件处理耗费的时间],运行完一个任务发现运行普通任务的时间超过该时间就不会再从任务队列中获取下一个任务而是继续处理IO事件,IO事件处理完以后继续计算处理普通任务的时间继续从任务队列获取普通任务进行处理

  7. selectedKeys优化

    • 前面源码分析替换selectorselectedKeys已经看过了,这里讲解对替换后的selectedKeys的处理过程

 

Accept流程

  1. Nio中的accept事件处理流程

    • selector.select()阻塞当前线程直到accept事件发生

    • 有事件发生触发当前线程遍历selectedKeys处理通道上的事件

    • 判断选中事件监听对象上的事件类型是否为accept事件[注意所有的accept事件都由一个事件监听对象监听,此时建立请求连接的通道还没有成功建立]

    • 有接入事件创建SocketChannel,并将该SocketChannel设置为非阻塞

    • SocketChannel注册到选择器selector上,返回时间监听对象selectionKey

    • 返回的事件监听对象关注selectionKey上的read事件

  2. Netty中的accept事件处理流程

    • NIOselector.select()EventLoopGroup章节的nioEventLoop.run()中涉及

    • NIO的遍历selectedKeys和处理accept事件在EventLoopGroup章节的nioEventLoop.processSelectedKeysOptimized()中涉及

    • 有接入事件创建SocketChannel,设置通道为非阻塞模式,封装socketChannelNioSocketChannel

    • socketChannel注册到选择器selector上,返回事件监听对象,通过pipeline.invokeHandlerAddedIfNeeded();调用用户定义的初始化器对通道进行初始化操作,比如初始化通道对应流水线上用户自定义的所有处理器

    • 配置socketChannel对应事件监听对象selectionKey关注通道上的READ事件

 

Read流程

  1. NIORead事件处理流程

    • selector.select()阻塞NIO线程直到事件发生

    • 遍历所有selectionKey判断事件类型

    • 如果为Read事件使用ByteBuf从通道读入数据并自行进行粘包半包,解析数据等处理

  2. 流程源码分析

    • 很多都是上面已经分析过的,从abstractNioByteChannel$nioByteUnsafe.read()才是读事件下的处理流程,以前的代码和serverBootstrap启动以及accept事件的处理是共用的代码

     

 

 

 

 

 

附录

  1. win10自带计算器中HEX为十六进制,DEC为十进制,BIN为8个比特位的完整字节

  2. Netty可以开发物联网

  3. Netty的核心代码一行注释都没有,但是针对用户使用的类上的说明文档还是写的很到位

  4. 使用枚举类实现接口,这种方式相较于类实现接口更简洁方便

    • 要使用其中一种实现可以直接通过byte[] ---> Serializer.Algorithm.Java.serialize(message)将消息正文序列化为byte数组,通过Message message = Serializer.Algorithm.JDK,deserialize(Message.class,bytes)byte数组反序列化为Java对象

  5. 关注一下JDK中的Properties